Re: [pyspark delta] [delta][Spark SQL]: Getting an Analysis Exception. The associated location (path) is not empty

2022-08-01 Thread Sean Owen
Pretty much what it says? you are creating a table over a path that already
has data in it. You can't do that without mode=overwrite at least, if
that's what you intend.

On Mon, Aug 1, 2022 at 7:29 PM Kumba Janga  wrote:

>
>
>- Component: Spark Delta, Spark SQL
>- Level: Beginner
>- Scenario: Debug, How-to
>
> *Python in Jupyter:*
>
> import pyspark
> import pyspark.sql.functions
>
> from pyspark.sql import SparkSession
> spark = (
> SparkSession
> .builder
> .appName("programming")
> .master("local")
> .config("spark.jars.packages", "io.delta:delta-core_2.12:0.7.0")
> .config("spark.sql.extensions", 
> "io.delta.sql.DeltaSparkSessionExtension")
> .config("spark.sql.catalog.spark_catalog", 
> "org.apache.spark.sql.delta.catalog.DeltaCatalog")
> .config('spark.ui.port', '4050')
> .getOrCreate()
>
> )
> from delta import *
>
> string_20210609 = '''worked_date,worker_id,delete_flag,hours_worked
> 2021-06-09,1001,Y,7
> 2021-06-09,1002,Y,3.75
> 2021-06-09,1003,Y,7.5
> 2021-06-09,1004,Y,6.25'''
>
> rdd_20210609 = spark.sparkContext.parallelize(string_20210609.split('\n'))
>
> # FILES WILL SHOW UP ON THE LEFT UNDER THE FOLDER ICON IF YOU WANT TO BROWSE 
> THEM
> OUTPUT_DELTA_PATH = './output/delta/'
>
> spark.sql('CREATE DATABASE IF NOT EXISTS EXERCISE')
>
> spark.sql('''
> CREATE TABLE IF NOT EXISTS EXERCISE.WORKED_HOURS(
> worked_date date
> , worker_id int
> , delete_flag string
> , hours_worked double
> ) USING DELTA
>
>
> PARTITIONED BY (worked_date)
> LOCATION "{0}"
> '''.format(OUTPUT_DELTA_PATH)
> )
>
> *Error Message:*
>
> AnalysisException Traceback (most recent call 
> last) in   4 spark.sql('CREATE 
> DATABASE IF NOT EXISTS EXERCISE')  5 > 6 spark.sql('''  7 
> CREATE TABLE IF NOT EXISTS EXERCISE.WORKED_HOURS(  8 worked_date 
> date
> /Users/kyjan/spark-3.0.3-bin-hadoop2.7\python\pyspark\sql\session.py in 
> sql(self, sqlQuery)647 [Row(f1=1, f2=u'row1'), Row(f1=2, 
> f2=u'row2'), Row(f1=3, f2=u'row3')]648 """--> 649 return 
> DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)650 651
>  @since(2.0)
> \Users\kyjan\spark-3.0.3-bin-hadoop2.7\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py
>  in __call__(self, *args)   13021303 answer = 
> self.gateway_client.send_command(command)-> 1304 return_value = 
> get_return_value(   1305 answer, self.gateway_client, 
> self.target_id, self.name)   1306
> /Users/kyjan/spark-3.0.3-bin-hadoop2.7\python\pyspark\sql\utils.py in 
> deco(*a, **kw)132 # Hide where the exception came from 
> that shows a non-Pythonic133 # JVM exception message.--> 
> 134 raise_from(converted)135 else:136 
> raise
> /Users/kyjan/spark-3.0.3-bin-hadoop2.7\python\pyspark\sql\utils.py in 
> raise_from(e)
> AnalysisException: Cannot create table ('`EXERCISE`.`WORKED_HOURS`'). The 
> associated location ('output/delta') is not empty.;
>
>
> --
> Best Wishes,
> Kumba Janga
>
> "The only way of finding the limits of the possible is by going beyond
> them into the impossible"
> -Arthur C. Clarke
>


Re: WARN: netlib.BLAS

2022-08-01 Thread Sean Owen
Hm, I think the problem is either that you need to build the
spark-ganglia-lgpl module in your Spark distro, or the pomOnly() part of
your build. You need the code in your app.
Yes you need openblas too.

On Mon, Aug 1, 2022 at 7:36 AM 陈刚  wrote:

> Dear export,
>
>
> I'm using spark-3.1.1 mllib, and I got this on CentOS 7.6:
>
>
> 22/08/01 09:42:34 WARN netlib.BLAS: Failed to load implementation from:
> com.github.fommil.netlib.NativeSystemBLAS
> 22/08/01 09:42:34 WARN netlib.BLAS: Failed to load implementation from:
> com.github.fommil.netlib.NativeRefBLAS
>
>
> I used
>
> yum -y install openblas*
>
> yum -y install blas
>
> to install blas
>
> and added
>
>
> // https://mvnrepository.com/artifact/com.github.fommil.netlib/all
> libraryDependencies += "com.github.fommil.netlib" % "all" % "1.1.2"
> pomOnly()
>
> in the sbt file.
>
>
> But I still got the WARN.
>
>
> Please help me!
>
>
> Best
>
>
> Gang Chen
>


Re: Spark Avro Java 17 Compatibility

2022-07-27 Thread Sean Owen
See the documentation at spark.apache.org . Spark 2.4 definitely does not
support versions after Java 8. Spark 3.3 supports 17.
(General note to anyone mailing the list, don't use a ".invalid" reply-to
address)

On Wed, Jul 27, 2022 at 7:47 AM Shivaraj Sivasankaran
 wrote:

> Gentle Reminder on the below query.
>
>
>
> Regards,
>
> Shivaraj Sivasankaran.
>
>
>
> *From:* Shivaraj Sivasankaran
> *Sent:* Tuesday, March 29, 2022 11:54 AM
> *To:* user@spark.apache.org
> *Cc:* M Vasanthakumar 
> *Subject:* Spark Avro Java 17 Compatibility
>
>
>
> Hi,
>
>
>
>
>
> Am Shivaraj from Ericsson India Global Private Limited, we have our own
> software to meet the business need and we use below open source library to
> achieve certain functionalities. Since we have a plan to upgrade existing
> java runtime environment to Java 17 we need to know the compatibility of
> the below component that can run on top of Java 17. Please let us know the
> plan/roadmap of upgrading to java 17, it would be helpful if you are giving
> matrix with table for components and their supported java version
> (including java 17).
>
>
>
> *Vendor*
>
> *Software Name*
>
> *Version *
>
> Apache
>
> Spark Avro
>
> 2.4.4
>
>
>
>
>
> Regards,
>
> Shivaraj Sivasankaran.
>


Re: Updating Broadcast Variable in Spark Streaming 2.4.4

2022-07-22 Thread Sean Owen
I think you're taking the right approach, trying to create a new broadcast
var. What part doesn't work? for example I wonder if comparing Map equality
like that does what you think, isn't it just reference equality? debug a
bit more to see whether it even destroys and recreates the broadcast in
your code.

On Fri, Jul 22, 2022 at 4:24 AM Dipl.-Inf. Rico Bergmann <
i...@ricobergmann.de> wrote:

> Hi folks!
>
> I'm trying to implement an update of a broadcast var in Spark Streaming.
> The idea is that whenever some configuration value has changed (this is
> periodically checked by the driver) the existing broadcast variable is
> unpersisted and then (re-)broadcasted.
>
> In a local test setup (using a local Spark) it works fine but on a real
> cluster it doesn't work. The broadcast variable never gets updated. Am I
> doing something wrong? Or is this simply not possible? Or a bug?
>
> Code snippet:
>
> @RequiredArgsConstructor
> public class BroadcastUpdater implements Runnable {
>  private final transient JavaSparkContext sparkContext;
>  @Getter
>  private transient volatile Broadcast>
> broadcastVar;
>  private transient Map configMap;
>
>  public void run() {
>  Map configMap = getConfigMap();
>  if (this.broadcastVar == null ||
> !configMap.equals(this.configMap)) {
>  this.configMap = configMap;
>  if (broadcastVar != null) {
>  broadcastVar.unpersist(true);
>  broadcastVar.destroy(true);
>  }
>  this.broadcastVar =
> this.sparkContext.broadcast(this.configMap);
>  }
>  }
>
>  private Map getConfigMap() {
>  //impl details
>  }
> }
>
> public class StreamingFunction implements Serializable {
>
>  private transient volatile BroadcastUpdater broadcastUpdater;
>
>  protected JavaStreamingContext startStreaming(JavaStreamingContext
> context, ConsumerStrategy consumerStrategy) {
>  broadcastUpdater = new BroadcastUpdater(context.sparkContext());
>  ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new
> ScheduledThreadPoolExecutor(1);
> scheduledThreadPoolExecutor.scheduleWithFixedDelay(broadcastUpdater, 0,
> 3, TimeUnit.SECONDS);
>
>  final JavaInputDStream ChangeDataRecord>> inputStream = KafkaUtils.createDirectStream(context,
>  LocationStrategies.PreferConsistent(), consumerStrategy);
>
>  inputStream.foreachRDD(rdd -> {
>  Broadcast> broadcastVar =
> broadcastUpdater.getBroadcastVar();
>  rdd.foreachPartition(partition -> {
>  if (partition.hasNext()) {
>  Map configMap =
> broadcastVar.getValue();
>
>  // iterate
>  while (partition.hasNext()) {
>  //impl logic using broadcast variable
>  }
>  }
>  }
>  }
>  }
> }
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: [MLlib] Differences after version upgrade

2022-07-20 Thread Sean Owen
How different? I think quite small variations are to be expected.

On Wed, Jul 20, 2022 at 9:13 AM Roger Wechsler  wrote:

> Hi!
>
> We've been using Spark 3.0.1 to train Logistic regression models
> with MLLIb.
> We've recently upgraded to Spark 3.3.0 without making any other code
> changes and noticed that the trained models are different as compared to
> the ones trained with 3.0.1 and therefore behave differently when used for
> prediction.
>
> We went through the release notes and through the API changes to see if
> the default behaviour changed, but we could not find anything. Do you know
> what changed between version 3.01 and 3.3.0? And if so, how could we
> guarantee the same behaviour as in 3.0.1?
>
> Thanks for your help!
>
> Roger
>


Re: Building a ML pipeline with no training

2022-07-20 Thread Sean Owen
The data transformation is all the same.
Sure, linear regression is easy:
https://spark.apache.org/docs/latest/ml-classification-regression.html#linear-regression
These are components that operate on DataFrames.

You'll want to look at VectorAssembler to prepare data into an array column.
There are other transformations you may want like normalization, also in
the Spark ML components.
You can put those steps together into a Pipeline to fit and transform with
it as one unit.

On Wed, Jul 20, 2022 at 3:04 AM Edgar H  wrote:

> Morning everyone,
>
> The question may seem to broad but will try to synth as much as possible:
>
> I'm used to work with Spark SQL, DFs and such on a daily basis, easily
> grouping, getting extra counters and using functions or UDFs. However, I've
> come to an scenario where I need to make some predictions and linear
> regression is the way to go.
>
> However, lurking through the docs this belongs to the ML side of Spark and
> never been in there before...
>
> How is it working with Spark ML compared to what I'm used to? Training
> models, building a new one, adding more columns and such... Is there even a
> change or I'm just confused and it's pretty easy?
>
> When deploying ML pipelines, is there anything to take into account
> compared to the usual ones with Spark SQL and such?
>
> And... Is it even possible to do linear regression (or any other ML
> method) inside a traditional pipeline without training or any other ML
> related aspects?
>
> Some guidelines (or articles, ref to docs) would be helpful to start if
> possible.
>
> Thanks!
>


Re: [Building] Building with JDK11

2022-07-18 Thread Sean Owen
Why do you need Java 11 bytecode though?
Java 8 bytecode runs fine on Java 11. The settings in the build are really
there for testing, not because it's required to use Java 11.

On Mon, Jul 18, 2022 at 10:29 PM Gera Shegalov  wrote:

> Bytecode version is controlled by javac "-target" option for Java, and by
> scalac "-target:" for Scala
> JDK can cross-compile between known versions.
>
> Spark uses 1.8 as source and target by default controlled by the Maven
> property java.version
> .
> But it's also hard-coded with -target:jvm-1.8
> 
> for Scala. Higher JDK versions can run lower version bytecode.
>
> if you want to try 11, replace occurences -target:jvm-1.8 by
> --target:jvm-${java.version} in pom.xml you should be able to produce 11
> bytecode by adding -Djava.version=11 to your Maven build command.
>
> ./build/mvn -Djava.version=11  ...
>
> However, I did not try beyond a quick compile on core and cannot say
> anything about fallout implications at run time.
>


Re: very simple UI on webpage to display x/y plots+histogram of data stored in hive

2022-07-18 Thread Sean Owen
Sure, look at any python-based plotting package. plot.ly does this nicely.
You pull your data via Spark to a pandas DF and do whatever you want.

On Mon, Jul 18, 2022 at 1:42 PM Joris Billen 
wrote:

> Hi,
> I am making a very short demo and would like to make the most rudimentary
> UI (withouth knowing anything about front end) that would show a x/y plot
> of data stored in HIVE (that I typically query with spark) together with a
> histogram (something one would typically created in a jupyter notebook).
> Without being a front-end developer, does something exists to publish
> something on a html page, where the user selects something (for instance
> out of a list a certain date), and then in the browser, the store x/y plot
> based on hive tables and a histogram is returned?
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Issue while building spark project

2022-07-18 Thread Sean Owen
Increase the stack size for the JVM when Maven / SBT run. The build sets
this but you may still need something like "-Xss4m" in your MAVEN_OPTS

On Mon, Jul 18, 2022 at 11:18 AM rajat kumar 
wrote:

> Hello ,
>
> Can anyone pls help me in below error. It is a maven project. It is coming
> while building it
>
> [ERROR] error: java.lang.StackOverflowError
> [INFO] at
> scala.tools.nsc.typechecker.Typers$Typer.typedApply$1(Typers.scala:4885)
>


CVE-2022-33891: Apache Spark shell command injection vulnerability via Spark UI

2022-07-17 Thread Sean Owen
Severity: important

Description:

The Apache Spark UI offers the possibility to enable ACLs via the
configuration option spark.acls.enable. With an authentication filter, this
checks whether a user has access permissions to view or modify the
application. If ACLs are enabled, a code path in HttpSecurityFilter can
allow someone to perform impersonation by providing an arbitrary user name.
A malicious user might then be able to reach a permission check function
that will ultimately build a Unix shell command based on their input, and
execute it. This will result in arbitrary shell command execution as the
user Spark is currently running as. This affects Apache Spark versions
3.0.3 and earlier, versions 3.1.1 to 3.1.2, and versions 3.2.0 to 3.2.1.

This issue is being tracked as SPARK-38992

Mitigation:

Upgrade to supported Apache Spark maintenance release 3.1.3, 3.2.2, or
3.3.0 or later

Credit:

 Kostya Kortchinsky (Databricks)


Re: [EXTERNAL] RDD.pipe() for binary data

2022-07-16 Thread Sean Owen
Use GraphFrames?

On Sat, Jul 16, 2022 at 3:54 PM Yuhao Zhang  wrote:

> Hi Shay,
>
> Thanks for your reply! I would very much like to use pyspark. However, my
> project depends on GraphX, which is only available in the Scala API as far
> as I know. So I'm locked with Scala and trying to find a way out. I wonder
> if there's a way to go around it.
>
> Best regards,
> Yuhao Zhang
>
>
> On Sun, Jul 10, 2022 at 5:36 AM Shay Elbaz  wrote:
>
>> Yuhao,
>>
>>
>> You can use pyspark as entrypoint to your application. With py4j you can
>> call Java/Scala functions from the python application. There's no need to
>> use the pipe() function for that.
>>
>>
>> Shay
>> --
>> *From:* Yuhao Zhang 
>> *Sent:* Saturday, July 9, 2022 4:13:42 AM
>> *To:* user@spark.apache.org
>> *Subject:* [EXTERNAL] RDD.pipe() for binary data
>>
>>
>> *ATTENTION:* This email originated from outside of GM.
>>
>>
>> Hi All,
>>
>> I'm currently working on a project involving transferring between  Spark
>> 3.x (I use Scala) and a Python runtime. In Spark, data is stored in an RDD
>> as floating-point number arrays/vectors and I have custom routines written
>> in Python to process them. On the Spark side, I also have some operations
>> specific to Spark Scala APIs, so I need to use both runtimes.
>>
>> Now to achieve data transfer I've been using the RDD.pipe() API, by 1.
>> converting the arrays to strings in Spark and calling RDD.pipe(script.py)
>> 2. Then Python receives the strings and casts them as Python's data
>> structures and conducts operations. 3. Python converts the arrays into
>> strings and prints them back to Spark. 4. Spark gets the strings and cast
>> them back as arrays.
>>
>> Needless to say, this feels unnatural and slow to me, and there are some
>> potential floating-point number precision issues, as I think the floating
>> number arrays should have been transmitted as raw bytes. I found no way to
>> use the RDD.pipe() for this purpose, as written in
>> https://github.com/apache/spark/blob/3331d4ccb7df9aeb1972ed86472269a9dbd261ff/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala#L139,
>> .pipe() seems to be locked with text-based streaming.
>>
>> Can anyone shed some light on how I can achieve this? I'm trying to come
>> up with a way that does not involve modifying the core Spark myself. One
>> potential solution I can think of is saving/loading the RDD as binary files
>> but I'm hoping to find a streaming-based solution. Any help is much
>> appreciated, thanks!
>>
>>
>> Best regards,
>> Yuhao
>>
>


Re: [Building] Building with JDK11

2022-07-15 Thread Sean Owen
Java 8 binaries are probably on your PATH

On Fri, Jul 15, 2022, 5:01 PM Szymon Kuryło  wrote:

> Hello,
>
> I'm trying to build a Java 11 Spark distro using the
> dev/make-distribution.sh script.
> I have set JAVA_HOME to point to JDK11 location, I've also set the
> java.version property in pom.xml to 11, effectively also setting
> `maven.compile.source` and `maven.compile.target`.
> When inspecting classes from the `dist` directory with `javap -v`, I find
> that the class major version is 52, which is specific to JDK8. Am I missing
> something? Is there a reliable way to set the JDK used in the build process?
>
> Thanks,
> Szymon K.
>


Re: Spark (K8S) IPv6 support

2022-07-14 Thread Sean Owen
I don't know about the state of IPv6 support, but yes you're right in
guessing that 3.4.0 might be released perhaps early next year.
You can always clone the source repo and build it!

On Thu, Jul 14, 2022 at 2:19 PM Valer  wrote:

> Hi,
>
> We're starting to use IPv6-only K8S cluster (EKS) which currently breaks
> spark. I've noticed SPARK-39457
> 
>  contains
> a lot of focus on this, where all the sub-tasks seem to be done and
> indicates this should come in 3.4.0, so I'd like to ask a couple of
> questions:
>
>
>- Is 3.4.0 supposed to fully support IPv6 ?
>- When should I roughly expect it to be released? I've noticed that
>3.2 released in October and 3.3 this June. Is this a somewhat stable
>release frequency (half-yearly)?
>- Is there any way currently to download a tarball with the "master" /
>"latest" version that we could run before releasing ? The apache archive
>only has actual semver'd releases.
>
>
> Thanks in advance :)
>
> Regards,
> *Valér*
>


Re: about cpu cores

2022-07-10 Thread Sean Owen
Jobs consist of tasks, each of which consumes a core (can be set to >1 too,
but that's a different story). If there are more tasks ready to execute
than available cores, some tasks simply wait.

On Sun, Jul 10, 2022 at 3:31 AM Yong Walt  wrote:

> given my spark cluster has 128 cores totally.
> If the jobs (each job was assigned only one core) I submitted to the
> cluster are over 128, what will happen?
>
> Thank you.
>


Re: How is Spark a memory based solution if it writes data to disk before shuffles?

2022-07-02 Thread Sean Owen
I think that is more accurate yes. Though, shuffle files are local, not on
distributed storage too, which is an advantage. MR also had map only
transforms and chained mappers, but harder to use. Not impossible but you
could also say Spark just made it easier to do the more efficient thing.

On Sat, Jul 2, 2022, 9:34 AM krexos  wrote:

>
> You said Spark performs IO only when reading data and writing final data
> to the disk. I though by that you meant that it only reads the input files
> of the job and writes the output of the whole job to the disk, but in
> reality spark does store intermediate results on disk, just in less places
> than MR
>
> --- Original Message ---
> On Saturday, July 2nd, 2022 at 5:27 PM, Sid 
> wrote:
>
> I have explained the same thing in a very layman's terms. Go through it
> once.
>
> On Sat, 2 Jul 2022, 19:45 krexos,  wrote:
>
>>
>> I think I understand where Spark saves IO.
>>
>> in MR we have map -> reduce -> map -> reduce -> map -> reduce ...
>>
>> which writes results do disk at the end of each such "arrow",
>>
>> on the other hand in spark we have
>>
>> map -> reduce + map -> reduce + map -> reduce ...
>>
>> which saves about 2 times the IO
>>
>> thanks everyone,
>> krexos
>>
>> --- Original Message ---
>> On Saturday, July 2nd, 2022 at 1:35 PM, krexos 
>> wrote:
>>
>> Hello,
>>
>> One of the main "selling points" of Spark is that unlike Hadoop
>> map-reduce that persists intermediate results of its computation to HDFS
>> (disk), Spark keeps all its results in memory. I don't understand this as
>> in reality when a Spark stage finishes it writes all of the data into
>> shuffle files stored on the disk
>> .
>> How then is this an improvement on map-reduce?
>>
>> Image from https://youtu.be/7ooZ4S7Ay6Y
>>
>>
>> thanks!
>>
>>
>>
>


Re: How is Spark a memory based solution if it writes data to disk before shuffles?

2022-07-02 Thread Sean Owen
You're right. I suppose I just mean most operations don't need a shuffle -
you don't have 10 stages for 10 transformations. Also: caching in memory is
another way that memory is used to avoid IO.

On Sat, Jul 2, 2022, 8:42 AM krexos  wrote:

> This doesn't add up with what's described in the internals page I
> included. What you are talking about is shuffle spills at the beginning of
> the stage. What I am talking about is that at the end of the stage spark
> writes all of the stage's results to shuffle files on disk, thus we will
> have the same amount of IO writes as there are stages.
>
> thanks,
> krexos
>
> --- Original Message ---
> On Saturday, July 2nd, 2022 at 3:34 PM, Sid 
> wrote:
>
> Hi Krexos,
>
> If I understand correctly, you are trying to ask that even spark involves
> disk i/o then how it is an advantage over map reduce.
>
> Basically, Map Reduce phase writes every intermediate results to the disk.
> So on an average it involves 6 times disk I/O whereas spark(assuming it has
> an enough memory to store intermediate results) on an average involves 3
> times less disk I/O i.e only while reading the data and writing the final
> data to the disk.
>
> Thanks,
> Sid
>
> On Sat, 2 Jul 2022, 17:58 krexos,  wrote:
>
>> Hello,
>>
>> One of the main "selling points" of Spark is that unlike Hadoop
>> map-reduce that persists intermediate results of its computation to HDFS
>> (disk), Spark keeps all its results in memory. I don't understand this as
>> in reality when a Spark stage finishes it writes all of the data into
>> shuffle files stored on the disk
>> .
>> How then is this an improvement on map-reduce?
>>
>> Image from https://youtu.be/7ooZ4S7Ay6Y
>>
>>
>> thanks!
>>
>
>


Re: How is Spark a memory based solution if it writes data to disk before shuffles?

2022-07-02 Thread Sean Owen
Because only shuffle stages write shuffle files. Most stages are not
shuffles

On Sat, Jul 2, 2022, 7:28 AM krexos  wrote:

> Hello,
>
> One of the main "selling points" of Spark is that unlike Hadoop map-reduce
> that persists intermediate results of its computation to HDFS (disk), Spark
> keeps all its results in memory. I don't understand this as in reality when
> a Spark stage finishes it writes all of the data into shuffle files
> stored on the disk
> .
> How then is this an improvement on map-reduce?
>
> Image from https://youtu.be/7ooZ4S7Ay6Y
>
>
> thanks!
>


Re: Spark Group How to Ask

2022-07-01 Thread Sean Owen
Yes, user@spark.apache.org. This incubator address hasn't been used in
about 8 years.

On Fri, Jul 1, 2022 at 10:24 AM Zehra Günindi
 wrote:

> Hi,
>
> Is there any group for asking question related to Apache Spark?
>
>
> Sincerely,
> Zehra
>
> obase
> TEL: +90216 527 30 00
> FAX: +90216 527 31 11
>  
>  
> 
>
> Bu elektronik posta ve onunla iletilen bütün dosyalar sadece göndericisi
> tarafindan almasi amaclanan yetkili gercek ya da tüzel kisinin kullanimi
> icindir. Eger söz konusu yetkili alici degilseniz bu elektronik postanin
> icerigini aciklamaniz, kopyalamaniz, yönlendirmeniz ve kullanmaniz
> kesinlikle yasaktir ve bu elektronik postayi derhal silmeniz gerekmektedir.
> OBASE bu mesajin icerdigi bilgilerin doğruluğu veya eksiksiz oldugu
> konusunda herhangi bir garanti vermemektedir. Bu nedenle bu bilgilerin ne
> sekilde olursa olsun iceriginden, iletilmesinden, alinmasindan ve
> saklanmasindan sorumlu degildir. Bu mesajdaki görüsler yalnizca gönderen
> kisiye aittir ve OBASE görüslerini yansitmayabilir.
>
> Bu e-posta bilinen bütün bilgisayar virüslerine karsi taranmistir.
>
> This e-mail and any files transmitted with it are confidential and
> intended solely for the use of the individual or entity to whom they are
> addressed. If you are not the intended recipient you are hereby notified
> that any dissemination, forwarding, copying or use of any of the
> information is strictly prohibited, and the e-mail should immediately be
> deleted. OBASE makes no warranty as to the accuracy or completeness of any
> information contained in this message and hereby excludes any liability of
> any kind for the information contained therein or for the information
> transmission, recepxion, storage or use of such in any way whatsoever. The
> opinions expressed in this message belong to sender alone and may not
> necessarily reflect the opinions of OBASE.
>
> This e-mail has been scanned for all known computer viruses.
>


Re: Follow up on Jira Issue 39549

2022-06-24 Thread Sean Owen
No, programs can't read information out of other processes's memory - this
is true of all software. A cached DataFrame is tied to a Spark
application, but many things could be running inside that app. You may be
thinking of something like, exposing some kind of service on the driver
that responds to requests and returns results out of memory. Sure you could
do that. People usually use a service that uses Spark as a SQL engine for
things like that.

On Fri, Jun 24, 2022 at 1:17 PM Chenyang Zhang  wrote:

> Thanks for response. I want to figure out is there a way to share data
> without writing it to disk because of performance issues.
>
>
>
> *Chenyang Zhang*
> Software Engineering Intern, Platform
> Redwood City, California
> <https://c3.ai/?utm_source=signature_campaign=enterpriseai>
> © 2022 C3.ai. Confidential Information.
>
> On Jun 24, 2022, at 11:15 AM, Sean Owen  wrote:
>
> *EXTERNAL EMAIL:* This email originated outside of our organization. Do
> not click links or open attachments unless you recognize the sender and
> know the content is safe.
> Spark is decoupled from storage. You can write data to any storage you
> like. Anything that can read that data, can read that data - Spark or not,
> different session or not. Temp views are specific to a session and do not
> store data. I think this is trivial and no problem at all, or else I'm not
> clear what you're asking.
>
> On Fri, Jun 24, 2022 at 1:03 PM Chenyang Zhang 
> wrote:
>
>> Hi,
>>
>>
>>
>> Thanks so much for the response from
>> https://issues.apache.org/jira/browse/SPARK-39549
>> <https://nam12.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FSPARK-39549=05%7C01%7CChenyang.Zhang%40c3.ai%7C16869542c92f4af9188708da560d9886%7C53ad779a93e7485cba20ac8290d7252b%7C1%7C0%7C637916913636880755%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000%7C%7C%7C=sSW7Pi2vB9NTks0rCvRIZdO4ubJELlprARyXBO60upc%3D=0>.
>> I am curious what do you mean by write down to a table and read it from a
>> different Spark Application. Do you mean a table in a database or the Spark
>> to_table() api? Could I read the table created in different Spark Session
>> through to_table()? Is there any api I could use to achieve my objective? I
>> noticed there are some apis, e.g. createGlobalTempView. Thanks so much for
>> your help.
>>
>>
>>
>> Bets regards,
>>
>> Chenyang
>>
>>
>> *Chenyang Zhang*
>> Software Engineering Intern, Platform
>> Redwood City, California
>> 
>> <https://nam12.safelinks.protection.outlook.com/?url=https%3A%2F%2Fc3.ai%2F%3Futm_source%3Dsignature%26utm_campaign%3Denterpriseai=05%7C01%7CChenyang.Zhang%40c3.ai%7C16869542c92f4af9188708da560d9886%7C53ad779a93e7485cba20ac8290d7252b%7C1%7C0%7C637916913636880755%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000%7C%7C%7C=tACD7UJOVFHeyQhjH81dx2nJ5mo%2BdgaT%2F09YoqhCXyE%3D=0>
>> © 2022 C3.ai. Confidential Information.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>


Re: Follow up on Jira Issue 39549

2022-06-24 Thread Sean Owen
Spark is decoupled from storage. You can write data to any storage you
like. Anything that can read that data, can read that data - Spark or not,
different session or not. Temp views are specific to a session and do not
store data. I think this is trivial and no problem at all, or else I'm not
clear what you're asking.

On Fri, Jun 24, 2022 at 1:03 PM Chenyang Zhang  wrote:

> Hi,
>
>
>
> Thanks so much for the response from
> https://issues.apache.org/jira/browse/SPARK-39549. I am curious what do
> you mean by write down to a table and read it from a different Spark
> Application. Do you mean a table in a database or the Spark to_table() api?
> Could I read the table created in different Spark Session through
> to_table()? Is there any api I could use to achieve my objective? I noticed
> there are some apis, e.g. createGlobalTempView. Thanks so much for your
> help.
>
>
>
> Bets regards,
>
> Chenyang
>
>
> *Chenyang Zhang*
> Software Engineering Intern, Platform
> Redwood City, California
> 
> © 2022 C3.ai. Confidential Information.
>
>


Re: repartition(n) should be deprecated/alerted

2022-06-22 Thread Sean Owen
Eh, there is a huge caveat - you are making your input non-deterministic,
where determinism is assumed. I don't think that supports such a drastic
statement.

On Wed, Jun 22, 2022 at 12:39 PM Igor Berman  wrote:

> Hi All
> tldr; IMHO repartition(n) should be deprecated or red-flagged, so that
> everybody will understand consequences of usage of this method
>
> Following conversation in
> https://issues.apache.org/jira/browse/SPARK-38388 (still relevant for
> recent versions of spark) I think it's very important to mark this function
> somehow and to alert end-user about consequences of such usage
>
> Basically it may produce duplicates and data loss under retries for
> several kinds of input: among them non-deterministic input, but more
> importantly input that deterministic but might produce not exactly same
> results due to precision of doubles(and floats) in very simple queries like
> following
>
> sqlContext.sql(
> " SELECT integerColumn, SUM(someDoubleTypeValue) AS value
>   FROM data
>   GROUP BY integerColumn "
> ).repartition(3)
>
> (see comment from Tom in ticket)
>
> As an end-user I'd expect the retries mechanism to work in a consistent
> way and not to drop data silently(neither to produce duplicates)
>
> Any thoughts?
> thanks in advance
> Igor
>
>


Re: Spark Summit Europe

2022-06-21 Thread Sean Owen
It's still held, just called the Data and AI Summit.
https://databricks.com/dataaisummit/ Next one is next week; last one in
Europe was in November 2020, and think it might be virtual in Europe if
held separately this year.

On Tue, Jun 21, 2022 at 7:38 AM Gowran, Declan
 wrote:

> Announcing Spark Summit Europe | Apache Spark
> 
>
>
>
> Hello, I see this link is linked to 2015 and does not appear to have
> update. Assume its not held anymore  ?
>
>
>
> Declan
>
>
>
>
>
> **
>
>
> *Declan Gowran | Optum Global Advantage Ireland & UK*
>
> Associate Director - Sr Mgr I O Engineering – Infrastructure Data Science
> Engineering Dublin
>
> Block C, One Spencer Dock, North Wall Quay, Dublin 1  D01 X9R7
>
> +35318651360
>
> declan.gow...@optum.com
>
> *www.optum.com *
>
>
>
> *Optum Services (Ireland) Limited, a private company limited by shares,
> registered in Ireland with company *
>
> *number 579794 and with its registered office at Block C, 1 Spencer Dock,
> North Wall Quay, Dublin 1.*
>
>
>
> [image: cidimage001.png@01D8193A.F61553A0]
>
>
>
>
> This e-mail, including attachments, may include confidential and/or
> proprietary information, and may be used only by the person or entity
> to which it is addressed. If the reader of this e-mail is not the intended
> recipient or intended recipient’s authorized agent, the reader is hereby
> notified that any dissemination, distribution or copying of this e-mail is
> prohibited. If you have received this e-mail in error, please notify the
> sender by replying to this message and delete this e-mail immediately.
>


Re: How to guarantee dataset is split over unique partitions (partitioned by a column value)

2022-06-20 Thread Sean Owen
repartition() puts all values with the same key in one partition, but,
multiple other keys can be in the same partition. It sounds like you want
groupBy, not repartition, if you want to handle these separately.

On Mon, Jun 20, 2022 at 8:26 AM DESCOTTE Loic - externe
 wrote:

> Hi,
>
>
>
> I have a data type like this :
>
>case class Data(col: String, ...)
>
>
>
> and a Dataset[Data] ds. Some rows have columns filled with value 'a', and
> other with value 'b', etc.
>
>
>
> I want to process separately all data with a 'a', and all data with a 'b'.
> But I also need to have all the 'a' in the same partition.
>
>
>
> If I do : ds.repartition(col("col")).mapPartition(data => ???)
>
>
>
> Is it guaranteed by default that I will have all the 'a' in a single
> partition, and no 'b' mixed with it in this partition?
>
>
>
> My understanding is that all the 'a' will be in the same partition, but
> 'a' and 'b' may be mixed. So I should do :
>
>
>
> val nbDistinct = ds.select("col").distinct.count
>
> ds.repartition(col("col")).mapPartition{ data =>
>
>// split mixed values in a single partition with group by :
>
>data.groupBy(_.col).flatMap { case (col, rows) => ??? }
>
> }
>
>
>
> Is that correct?
>
>
>
>
>
> I can also do this to force the number of partitions with the number of
> distinct values :
>
> val nbDistinct = ds.select("col").distinct.count
>
> ds.repartition(nbDistinct , col("col")).mapPartition(data => ???)
>
>
>
> But is it useful?
>
> But it adds an action that may be expensive in some cases, and sometimes
> it seems to use less partitions than it should.
>
>
>
> Example (spark shell) :
>
> scala> (Range(0,10).map(_ => Data("a")) union Range(0,10).map(_ =>
> Data("b")) union Range(0,10).map(_ =>
> Data("c"))).toList.toDS.repartition(col("col")).rdd.foreachPartition(part
> => println(part.mkString))
>
>
>
> output :
>
> Data(b)Data(b)Data(b)Data(b)Data(b)Data(b)Data(b)Data(b)Data(b)Data(b)
>
> Data(a)Data(a)Data(a)Data(a)Data(a)Data(a)Data(a)Data(a)Data(a)Data(a)
>
> Data(c)Data(c)Data(c)Data(c)Data(c)Data(c)Data(c)Data(c)Data(c)Data(c)
>
>
>
> I have 3 partitions + a lot of empty partitions (blank lines)
>
> If I do :
>
>
>
> scala> (Range(0,10).map(_ => Data("a")) union Range(0,10).map(_ =>
> Data("b")) union Range(0,10).map(_ =>
> Data("c"))).toList.toDS.repartition(3,col("col")).rdd.foreachPartition(part
> => println(part.mkString))
>
> then output is :
>
>
> Data(a)Data(a)Data(a)Data(a)Data(a)Data(a)Data(a)Data(a)Data(a)Data(a)Data(b)Data(b)Data(b)Data(b)Data(b)Data(b)Data(b)Data(b)Data(b)Data(b)Data(c)Data(c)Data(c)Data(c)Data(c)Data(c)Data(c)Data(c)Data(c)Data(c)
>
>
>
> I would like to have 3 partitions with all ‘a’ (and only the ‘a’), then
> all ‘b’ etc. but I have 2 empty partitions and a partition with all the
> ‘a’, ‘b’ and ‘c’ of my dataset.
>
>
>
> So, is there a good way to guarantee a dataset is split over unique
> partitions for a column value?
>
>
>
> Thanks,
>
> Loïc
>
>
> Ce message et toutes les pièces jointes (ci-après le 'Message') sont
> établis à l'intention exclusive des destinataires et les informations qui y
> figurent sont strictement confidentielles. Toute utilisation de ce Message
> non conforme à sa destination, toute diffusion ou toute publication totale
> ou partielle, est interdite sauf autorisation expresse.
>
> Si vous n'êtes pas le destinataire de ce Message, il vous est interdit de
> le copier, de le faire suivre, de le divulguer ou d'en utiliser tout ou
> partie. Si vous avez reçu ce Message par erreur, merci de le supprimer de
> votre système, ainsi que toutes ses copies, et de n'en garder aucune trace
> sur quelque support que ce soit. Nous vous remercions également d'en
> avertir immédiatement l'expéditeur par retour du message.
>
> Il est impossible de garantir que les communications par messagerie
> électronique arrivent en temps utile, sont sécurisées ou dénuées de toute
> erreur ou virus.
> 
>
> This message and any attachments (the 'Message') are intended solely for
> the addressees. The information contained in this Message is confidential.
> Any use of information contained in this Message not in accord with its
> purpose, any dissemination or disclosure, either whole or partial, is
> prohibited except formal approval.
>
> If you are not the addressee, you may not copy, forward, disclose or use
> any part of it. If you have received this message in error, please delete
> it and all copies from your system and notify the sender immediately by
> return message.
>
> E-mail communication cannot be guaranteed to be timely secure, error or
> virus-free.
>


Re: how to properly filter a dataset by dates ?

2022-06-17 Thread Sean Owen
Look at your query again. You are comparing dates to strings. The dates
widen back to strings.

On Fri, Jun 17, 2022, 1:39 PM marc nicole  wrote:

> I also tried:
>
> dataset =
>> dataset.where(to_date(dataset.col("Date"),"MM-dd-").geq("02-03-2012"));
>
>
> But it returned an empty dataset.
>
> Le ven. 17 juin 2022 à 20:28, Sean Owen  a écrit :
>
>> Same answer as last time - those are strings, not dates. 02-02-2015 as a
>> string is before 02-03-2012.
>> You apply date function to dates, not strings.
>> You have to parse the dates properly, which was the problem in your last
>> email.
>>
>> On Fri, Jun 17, 2022 at 12:58 PM marc nicole  wrote:
>>
>>> Hello,
>>>
>>> I have a dataset containing a column of dates, which I want to use for
>>> filtering. Nothing, from what I have tried, seems to return the exact right
>>> solution.
>>> Here's my input:
>>>
>>> +   +
>>> |Date|
>>> ++
>>> | 02-08-2019 |
>>> ++
>>> | 02-07-2019 |
>>> ++
>>> | 12-01-2019 |
>>> ++
>>> | 02-02-2015 |
>>> ++
>>> | 02-03-2012 |
>>> ++
>>> | 05-06-2018 |
>>> ++
>>> | 02-08-2022 |
>>> ++
>>>
>>> The code that i have tried (always giving missing dates in the result):
>>>
>>> dataset = dataset.filter( dataset.col("Date").geq("02-03-2012"));  //
>>>> not showing the date of *02-02-2015*
>>>
>>>
>>> I tried to apply *date_trunc()* with the first parameter "day" but
>>> nothing.
>>>
>>> I have also compared a converted column (using *to_date()*) with a
>>> *literal *of the target date but always returning an empty dataset.
>>>
>>> How to do that in Java ?
>>>
>>>


Re: how to properly filter a dataset by dates ?

2022-06-17 Thread Sean Owen
Same answer as last time - those are strings, not dates. 02-02-2015 as a
string is before 02-03-2012.
You apply date function to dates, not strings.
You have to parse the dates properly, which was the problem in your last
email.

On Fri, Jun 17, 2022 at 12:58 PM marc nicole  wrote:

> Hello,
>
> I have a dataset containing a column of dates, which I want to use for
> filtering. Nothing, from what I have tried, seems to return the exact right
> solution.
> Here's my input:
>
> +   +
> |Date|
> ++
> | 02-08-2019 |
> ++
> | 02-07-2019 |
> ++
> | 12-01-2019 |
> ++
> | 02-02-2015 |
> ++
> | 02-03-2012 |
> ++
> | 05-06-2018 |
> ++
> | 02-08-2022 |
> ++
>
> The code that i have tried (always giving missing dates in the result):
>
> dataset = dataset.filter( dataset.col("Date").geq("02-03-2012"));  // not
>> showing the date of *02-02-2015*
>
>
> I tried to apply *date_trunc()* with the first parameter "day" but
> nothing.
>
> I have also compared a converted column (using *to_date()*) with a
> *literal *of the target date but always returning an empty dataset.
>
> How to do that in Java ?
>
>


Re: How to recognize and get the min of a date/string column in Java?

2022-06-14 Thread Sean Owen
Look at your data - doesn't match date format you give

On Tue, Jun 14, 2022, 3:41 PM marc nicole  wrote:

> for the input  (I changed the format)  :
>
> +---+
> |Date|
> +---+
> | 2019-02-08 |
> ++
> | 2019-02-07 |
> ++
> | 2019-12-01 |
> ++
> | 2015-02-02 |
> ++
> | 2012-02-03 |
> ++
> | 2018-05-06 |
> ++
> | 2022-02-08 |
> ++
> the output was 2012-01-03
>
> To note that for my below code to work I cast to string the resulting min
> column.
>
> Le mar. 14 juin 2022 à 21:12, Sean Owen  a écrit :
>
>> You haven't shown your input or the result
>>
>> On Tue, Jun 14, 2022 at 1:40 PM marc nicole  wrote:
>>
>>> Hi Sean,
>>>
>>> Even with MM for months it gives incorrect (but different this time) min
>>> value.
>>>
>>> Le mar. 14 juin 2022 à 20:18, Sean Owen  a écrit :
>>>
>>>> Yes that is right. It has to be parsed as a date to correctly reason
>>>> about ordering. Otherwise you are finding the minimum string
>>>> alphabetically.
>>>>
>>>> Small note, MM is month. mm is minute. You have to fix that for this to
>>>> work. These are Java format strings.
>>>>
>>>> On Tue, Jun 14, 2022, 12:32 PM marc nicole  wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I want to identify a column of dates as such, the column has formatted
>>>>> strings in the likes of: "06-14-2022" (the format being mm-dd-) and 
>>>>> get
>>>>> the minimum of those dates.
>>>>>
>>>>> I tried in Java as follows:
>>>>>
>>>>> if (dataset.filter(org.apache.spark.sql.functions.to_date(
>>>>>> dataset.col(colName), "mm-dd-").isNotNull()).select(colName).count() 
>>>>>> !=
>>>>>> 0) { 
>>>>>
>>>>>
>>>>> And to get the *min *of the column:
>>>>>
>>>>> Object colMin =
>>>>>> dataset.agg(org.apache.spark.sql.functions.min(org.apache.spark.sql.functions.to_date(dataset.col(colName),
>>>>>> "mm-dd-"))).first().get(0);
>>>>>
>>>>> // then I cast the *colMin *to string.
>>>>>
>>>>> To note that if i don't apply *to_date*() to the target column then
>>>>> the result will be erroneous (i think Spark will take the values as string
>>>>> and will get the min as if it was applied on an alphabetical string).
>>>>>
>>>>> Any better approach to accomplish this?
>>>>> Thanks.
>>>>>
>>>>


Re: How to recognize and get the min of a date/string column in Java?

2022-06-14 Thread Sean Owen
Yes that is right. It has to be parsed as a date to correctly reason about
ordering. Otherwise you are finding the minimum string alphabetically.

Small note, MM is month. mm is minute. You have to fix that for this to
work. These are Java format strings.

On Tue, Jun 14, 2022, 12:32 PM marc nicole  wrote:

> Hi,
>
> I want to identify a column of dates as such, the column has formatted
> strings in the likes of: "06-14-2022" (the format being mm-dd-) and get
> the minimum of those dates.
>
> I tried in Java as follows:
>
> if (dataset.filter(org.apache.spark.sql.functions.to_date(
>> dataset.col(colName), "mm-dd-").isNotNull()).select(colName).count() !=
>> 0) { 
>
>
> And to get the *min *of the column:
>
> Object colMin =
>> dataset.agg(org.apache.spark.sql.functions.min(org.apache.spark.sql.functions.to_date(dataset.col(colName),
>> "mm-dd-"))).first().get(0);
>
> // then I cast the *colMin *to string.
>
> To note that if i don't apply *to_date*() to the target column then the
> result will be erroneous (i think Spark will take the values as string and
> will get the min as if it was applied on an alphabetical string).
>
> Any better approach to accomplish this?
> Thanks.
>


Re: API Problem

2022-06-09 Thread Sean Owen
That repartition seems to do nothing? But yes the key point is use col()

On Thu, Jun 9, 2022, 9:41 PM Stelios Philippou  wrote:

> Perhaps
>
>
> finalDF.repartition(finalDF.rdd.getNumPartitions()).withColumn("status_for_batch
>
> To
>
> finalDF.repartition(finalDF.rdd.getNumPartitions()).withColumn(col("status_for_batch")
>
>
>
>
> On Thu, 9 Jun 2022, 22:32 Sid,  wrote:
>
>> Hi Experts,
>>
>> I am facing one problem while passing a column to the method.  The
>> problem is described in detail here:
>>
>>
>> https://stackoverflow.com/questions/72565095/how-to-pass-columns-as-a-json-record-to-the-api-method-using-pyspark
>>
>> TIA,
>> Sid
>>
>


Re: How the data is distributed

2022-06-06 Thread Sean Owen
Data is not distributed to executors by anything. If you are processing
data with Spark. Spark spawns tasks on executors to read chunks of data
from wherever they are (S3, HDFS, etc).


On Mon, Jun 6, 2022 at 4:07 PM Sid  wrote:

> Hi experts,
>
>
> When we load any file, I know that based on the information in the spark
> session about the executors location, status and etc , the data is
> distributed among the worker nodes and executors.
>
> But I have one doubt. Is the data initially loaded on the driver and then
> it is distributed or it is directly distributed amongst the workers?
>
> Thanks,
> Sid
>


Re: How to convert a Dataset to a Dataset?

2022-06-04 Thread Sean Owen
It sounds like you want to interpret the input as strings, do some
processing, then infer the schema. That has nothing to do with construing
the entire row as a string like "Row[foo=bar, baz=1]"

On Sat, Jun 4, 2022 at 10:32 AM marc nicole  wrote:

> Hi Sean,
>
> Thanks, actually I have a dataset where I want to inferSchema after
> discarding the specific String value of "+". I do this because the column
> would be considered StringType while if i remove that "+" value it will be
> considered DoubleType for example or something else. Basically I want to
> remove "+" from all dataset rows and then inferschema.
> Here my idea is to filter the rows not equal to "+" for the target columns
> (potentially all of them) and then use spark.read().csv() to read the new
> filtered dataset with the option inferSchema which would then yield correct
> column types.
> What do you think?
>
> Le sam. 4 juin 2022 à 15:56, Sean Owen  a écrit :
>
>> I don't think you want to do that. You get a string representation of
>> structured data without the structure, at best. This is part of the reason
>> it doesn't work directly this way.
>> You can use a UDF to call .toString on the Row of course, but, again
>> what are you really trying to do?
>>
>> On Sat, Jun 4, 2022 at 7:35 AM marc nicole  wrote:
>>
>>> Hi,
>>> How to convert a Dataset to a Dataset?
>>> What i have tried is:
>>>
>>> List list = dataset.as(Encoders.STRING()).collectAsList();
>>> Dataset datasetSt = spark.createDataset(list, Encoders.STRING());
>>> // But this line raises a org.apache.spark.sql.AnalysisException: Try to
>>> map struct... to Tuple1, but failed as the number of fields does not line
>>> up
>>>
>>> Type of columns being String
>>> How to solve this?
>>>
>>


Re: How to convert a Dataset to a Dataset?

2022-06-04 Thread Sean Owen
I don't think you want to do that. You get a string representation of
structured data without the structure, at best. This is part of the reason
it doesn't work directly this way.
You can use a UDF to call .toString on the Row of course, but, again
what are you really trying to do?

On Sat, Jun 4, 2022 at 7:35 AM marc nicole  wrote:

> Hi,
> How to convert a Dataset to a Dataset?
> What i have tried is:
>
> List list = dataset.as(Encoders.STRING()).collectAsList();
> Dataset datasetSt = spark.createDataset(list, Encoders.STRING());
> // But this line raises a org.apache.spark.sql.AnalysisException: Try to
> map struct... to Tuple1, but failed as the number of fields does not line
> up
>
> Type of columns being String
> How to solve this?
>


Re: [Spark SQL]: Does Spark SQL support WAITFOR?

2022-05-17 Thread Sean Owen
I don't think that is standard SQL? what are you trying to do, and why not
do it outside SQL?

On Tue, May 17, 2022 at 6:03 PM K. N. Ramachandran 
wrote:

> Gentle ping. Any info here would be great.
>
> Regards,
> Ram
>
> On Sun, May 15, 2022 at 5:16 PM K. N. Ramachandran 
> wrote:
>
>> Hello Spark Users Group,
>>
>> I've just recently started working on tools that use Apache Spark.
>> When I try WAITFOR in the spark-sql command line, I just get:
>>
>> Error: Error running query:
>> org.apache.spark.sql.catalyst.parser.ParseException:
>>
>> mismatched input 'WAITFOR' expecting (.. list of allowed commands..)
>>
>>
>> 1) Why is WAITFOR not allowed? Is there another way to get a process to
>> sleep for a desired period of time? I'm trying to test a timeout issue and
>> need to simulate a sleep behavior.
>>
>>
>> 2) Is there documentation that outlines why WAITFOR is not supported? I
>> did not find any good matches searching online.
>>
>> Thanks,
>> Ram
>>
>
>
> --
> K.N.Ramachandran
> Ph: 814-441-4279
>


Re: How do I read parquet with python object

2022-05-09 Thread Sean Owen
That's a parquet library error. It might be this:
https://issues.apache.org/jira/browse/PARQUET-1633 That's fixed in recent
versions of Parquet. You didn't say what versions of libraries you are
using, but try the latest Spark.


On Mon, May 9, 2022 at 8:49 AM  wrote:

> # python:
>
> import pandas as pd
>
> a = pd.DataFrame([[1, [2.3, 1.2]]], columns=['a', 'b'])
> a.to_parquet('a.parquet')
>
> # pyspark:
>
> d2 = spark.read.parquet('a.parquet')
>
> will return error:
>
> An error was encountered: An error occurred while calling o277.showString.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task
> 14 in stage 9.0 failed 4 times, most recent failure: Lost task 14.2 in
> stage 9.0 (TID 63, 10.169.0.196, executor 15):
> java.lang.IllegalArgumentException: Illegal Capacity: -221
>
> how can I fix it?
>
> Thanks.
>
>
>


Re: Reg: CVE-2020-9480

2022-04-28 Thread Sean Owen
It is not a real dependency, so should not be any issue. I am not sure why
your tool flags it at all.

On Thu, Apr 28, 2022 at 10:04 PM Sundar Sabapathi Meenakshi <
sun...@mcruncher.com> wrote:

> Hi all,
>
>   I am using spark-sql_2.12 dependency version 3.2.1 in my
> project. My dependency tracker highlights  the transitive dependency
> "unused"  from  spark-sql_2.12 as vulnerable. I check there is no update
> for these artifacts since 2014. Is the artifact used anywhere in spark ?
>
> To resolve this vulnerability,  can I exclude this "unused" artifact from
> spark-sql_2.12 ?  Will it cause any issues in my project ?
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Re: When should we cache / persist ? After or Before Actions?

2022-04-27 Thread Sean Owen
You certainly shouldn't just sprinkle them in, no, that has never been the
idea here. It can help in some cases, but is just overhead in others.
Be thoughtful about why you are adding these statements.

On Wed, Apr 27, 2022 at 11:16 AM Koert Kuipers  wrote:

> we have quite a few persists statements in our codebase whenever we are
> reusing a dataframe.
> we noticed that it slows things down quite a bit (sometimes doubles the
> runtime), while providing little benefits, since spark already re-uses the
> shuffle files underlying the dataframe efficiently even if you don't do the
> persist.
> so at this point i am considering removing those persist statements...
> not sure what other peoples experiences are on this
>
> ‪On Thu, Apr 21, 2022 at 9:41 AM ‫"Yuri Oleynikov (‫יורי אולייניקוב‬‎)"‬‎ <
> yur...@gmail.com> wrote:‬
>
>> Hi Sean
>>
>> Persisting/caching is useful when you’re going to reuse dataframe. So in
>> your case no persisting/caching is required. This is regarding to “when”.
>>
>> The “where” usually belongs to the closest point of reusing
>> calculations/transformations
>>
>> Btw, I’m not sure if caching is useful when you have a HUGE dataframe.
>> Maybe persisting will be more useful
>>
>> Best regards
>>
>> On 21 Apr 2022, at 16:24, Sean Owen  wrote:
>>
>> 
>> You persist before actions, not after, if you want the action's outputs
>> to be persistent.
>> If anything swap line 2 and 3. However, there's no point in the count()
>> here, and because there is already only one action following to write, no
>> caching is useful in that example.
>>
>> On Thu, Apr 21, 2022 at 2:26 AM Sid  wrote:
>>
>>> Hi Folks,
>>>
>>> I am working on Spark Dataframe API where I am doing following thing:
>>>
>>> 1) df = spark.sql("some sql on huge dataset").persist()
>>> 2) df1 = df.count()
>>> 3) df.repartition().write.mode().parquet("")
>>>
>>>
>>> AFAIK, persist should be used after count statement if at all it is
>>> needed to be used since spark is lazily evaluated and if I call any action
>>> it will recompute the above code and hence no use of persisting it before
>>> action.
>>>
>>> Therefore, it should be something like the below that should give better
>>> performance.
>>> 1) df= spark.sql("some sql on huge dataset")
>>> 2) df1 = df.count()
>>> 3) df.persist()
>>> 4) df.repartition().write.mode().parquet("")
>>>
>>> So please help me to understand how it should be exactly and why? If I
>>> am not correct
>>>
>>> Thanks,
>>> Sid
>>>
>>>
> CONFIDENTIALITY NOTICE: This electronic communication and any files
> transmitted with it are confidential, privileged and intended solely for
> the use of the individual or entity to whom they are addressed. If you are
> not the intended recipient, you are hereby notified that any disclosure,
> copying, distribution (electronic or otherwise) or forwarding of, or the
> taking of any action in reliance on the contents of this transmission is
> strictly prohibited. Please notify the sender immediately by e-mail if you
> have received this email by mistake and delete this email from your system.
>
> Is it necessary to print this email? If you care about the environment
> like we do, please refrain from printing emails. It helps to keep the
> environment forested and litter-free.


Re: Why is spark running multiple stages with the same code line?

2022-04-21 Thread Sean Owen
The line of code triggers a job, the job triggers stages. You should see
they are different operations, all supporting execution of the action on
that line.

On Thu, Apr 21, 2022 at 9:24 AM Joe  wrote:

> Hi Sean,
> Thanks for replying but my question was about multiple stages running
> the same line of code, not about multiple stages in general. Yes single
> job can have multiple stages, but they should not be repeated, as far
> as I know, if you're caching/persisting your intermediate outputs.
>
> My question is why am I seeing multiple stages running the same line of
> code? As I understand it stage is a grouping of operations that can be
> executed without shuffling data or invoking a new action and they are
> divided into tasks, and tasks are the ones that are executed in
> parallel and can have the same line of code running on different
> executors. Or is this assumption wrong?
> Thanks,
>
> Joe
>
>
> On Thu, 2022-04-21 at 09:14 -0500, Sean Owen wrote:
> > A job can have multiple stages for sure. One action triggers a job.
> > This seems normal.
> >
> > On Thu, Apr 21, 2022, 9:10 AM Joe  wrote:
> > > Hi,
> > > When looking at application UI (in Amazon EMR) I'm seeing one job
> > > for
> > > my particular line of code, for example:
> > > 64 Running count at MySparkJob.scala:540
> > >
> > > When I click into the job and go to stages I can see over a 100
> > > stages
> > > running the same line of code (stages are active, pending or
> > > completed):
> > > 190 Pending count at MySparkJob.scala:540
> > > ...
> > > 162 Active count at MySparkJob.scala:540
> > > ...
> > > 108 Completed count at MySparkJob.scala:540
> > > ...
> > >
> > > I'm not sure what that means, I thought that stage was a logical
> > > operation boundary and you could have only one stage in the job
> > > (unless
> > > you executed the same dataset+action many times on purpose) and
> > > tasks
> > > were the ones that were replicated across partitions. But here I'm
> > > seeing many stages running, each with the same line of code?
> > >
> > > I don't have a situation where my code is re-processing the same
> > > set of
> > > data many times, all intermediate sets are persisted.
> > > I'm not sure if EMR UI display is wrong or if spark stages are not
> > > what
> > > I thought they were?
> > > Thanks,
> > >
> > > Joe
> > >
> > >
> > >
> > > ---
> > > --
> > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> > >
>
>
>


Re: Why is spark running multiple stages with the same code line?

2022-04-21 Thread Sean Owen
A job can have multiple stages for sure. One action triggers a job. This
seems normal.

On Thu, Apr 21, 2022, 9:10 AM Joe  wrote:

> Hi,
> When looking at application UI (in Amazon EMR) I'm seeing one job for
> my particular line of code, for example:
> 64 Running count at MySparkJob.scala:540
>
> When I click into the job and go to stages I can see over a 100 stages
> running the same line of code (stages are active, pending or
> completed):
> 190 Pending count at MySparkJob.scala:540
> ...
> 162 Active count at MySparkJob.scala:540
> ...
> 108 Completed count at MySparkJob.scala:540
> ...
>
> I'm not sure what that means, I thought that stage was a logical
> operation boundary and you could have only one stage in the job (unless
> you executed the same dataset+action many times on purpose) and tasks
> were the ones that were replicated across partitions. But here I'm
> seeing many stages running, each with the same line of code?
>
> I don't have a situation where my code is re-processing the same set of
> data many times, all intermediate sets are persisted.
> I'm not sure if EMR UI display is wrong or if spark stages are not what
> I thought they were?
> Thanks,
>
> Joe
>
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: When should we cache / persist ? After or Before Actions?

2022-04-21 Thread Sean Owen
You persist before actions, not after, if you want the action's outputs to
be persistent.
If anything swap line 2 and 3. However, there's no point in the count()
here, and because there is already only one action following to write, no
caching is useful in that example.

On Thu, Apr 21, 2022 at 2:26 AM Sid  wrote:

> Hi Folks,
>
> I am working on Spark Dataframe API where I am doing following thing:
>
> 1) df = spark.sql("some sql on huge dataset").persist()
> 2) df1 = df.count()
> 3) df.repartition().write.mode().parquet("")
>
>
> AFAIK, persist should be used after count statement if at all it is needed
> to be used since spark is lazily evaluated and if I call any action it will
> recompute the above code and hence no use of persisting it before action.
>
> Therefore, it should be something like the below that should give better
> performance.
> 1) df= spark.sql("some sql on huge dataset")
> 2) df1 = df.count()
> 3) df.persist()
> 4) df.repartition().write.mode().parquet("")
>
> So please help me to understand how it should be exactly and why? If I am
> not correct
>
> Thanks,
> Sid
>
>


Re: How is union() implemented? Need to implement column bind

2022-04-21 Thread Sean Owen
Not a max - all values are needed. pivot() if anything is much closer, but
see the rest of this thread.

On Thu, Apr 21, 2022 at 1:19 AM Sonal Goyal  wrote:

> Seems like an interesting problem to solve!
>
> If I have understood it correctly, you have 10114 files each with the
> structure
>
> rowid, colA
> r1, a
> r2, b
> r3, c
> ...5 million rows
>
> if you union them, you will have
> rowid, colA, colB
> r1, a, null
> r2, b, null
> r3, c, null
> r1, null, d
> r2, null, e
> r3, null, f
>
> Will a window partition by rowid and max on column values not work ?
>
> Cheers,
> Sonal
> https://github.com/zinggAI/zingg
>
>
>
> On Thu, Apr 21, 2022 at 6:50 AM Sean Owen  wrote:
>
>> Oh, Spark directly supports upserts (with the right data destination) and
>> yeah you could do this as 1+ updates to a table without any pivoting,
>> etc. It'd still end up being 10K+ single joins along the way but individual
>> steps are simpler. It might actually be pretty efficient I/O wise as
>> columnar formats would not rewrite any other data on a write like this.
>>
>> On Wed, Apr 20, 2022 at 8:09 PM Andrew Davidson 
>> wrote:
>>
>>> Hi Sean
>>>
>>>
>>>
>>> My “insert” solution is hack that might work give we can easily spin up
>>> a single VM with a crazy amouts of memory. I would prefer to see a
>>> distributed solution. It is just a matter of time before someone want to
>>> create an even bigger table using cbind.
>>>
>>>
>>>
>>> I understand you probably already know a lot about traditional RDBS’s.
>>> Much of my post is back ground for others
>>>
>>>
>>>
>>> I used to do some of classic relational database work before tools like
>>> Hadoop, spark and NoSQL became available .
>>>
>>>
>>>
>>> The standard operations on a single table in a relation database are
>>>
>>>
>>>
>>> Insert “row”. This is similar to spark union.  Typically primary keys in
>>>  in rbdms tables are indexed  to enable quick look up. So insert is
>>> probably not 1 for. 1 with union. The row may not simply be appended to the
>>> end of the table.
>>>
>>>
>>>
>>> Update a “row”
>>>
>>> Delete a “row”
>>>
>>> Select “rows where”
>>>
>>>
>>>
>>> Rdms server enable row and table level locking. Data must always be in a
>>> consistent state. You must commit or abort you changes for them to persist
>>> and to release locks on the data. Locks are required because you have a
>>> single resource and may user requesting service simultaneously. This is
>>> very different from Spark
>>>
>>>
>>>
>>> Storage and memory used to be really expensive so often people tried to
>>> create “1st normal form” schemas. I.E. no duplicate data to reduce
>>> hardware cost.  1st normal design require you to use joins to the get
>>> data table you want. Joins are expensive. Often design duplicated some data
>>> to improve performance by minimize the number of joins required. Duplicate
>>> data make maintaining consistency harder. There are other advantages to
>>> normalized data design and as we are all aware in the bigdata world lots of
>>> disadvantages. The dbms ran on a single big machine. Join was not
>>> implemented as distributed map/reduce.
>>>
>>>
>>>
>>> So My idea is use a traditional RDMS server: my final table will have 5
>>> million rows and 10,114 columns.
>>>
>>>1. Read the column vector from each of 10,114 data files
>>>2. insert the column vector as a row in the table
>>>   1. I read a file that has a single element on each line. All I
>>>   need to do is replace \n with ,
>>>3. Now I have table with 10,115 rows and 5 million columns
>>>4. The row id (primary key) is the original file name
>>>5. The columns are the row ids in the original column vectors
>>>6. Now all I need to do is pivot this single table to get what I
>>>want. This is the only join or map/reduce like operation
>>>7. A table with 5million rows and 10,114 columns
>>>
>>>
>>>
>>>
>>>
>>> My final table is about 220 gb. I know at google my I have quota for up
>>> 2 mega mem machines. Each one has some think like 1.4 Tb of memory
>>>
>>>
>>>
>>> Kind regards
>>>
>>>
>>>
>>> Andy
>>>
>>>
>>>
>>>


Re: How is union() implemented? Need to implement column bind

2022-04-20 Thread Sean Owen
Oh, Spark directly supports upserts (with the right data destination) and
yeah you could do this as 1+ updates to a table without any pivoting,
etc. It'd still end up being 10K+ single joins along the way but individual
steps are simpler. It might actually be pretty efficient I/O wise as
columnar formats would not rewrite any other data on a write like this.

On Wed, Apr 20, 2022 at 8:09 PM Andrew Davidson  wrote:

> Hi Sean
>
>
>
> My “insert” solution is hack that might work give we can easily spin up a
> single VM with a crazy amouts of memory. I would prefer to see a
> distributed solution. It is just a matter of time before someone want to
> create an even bigger table using cbind.
>
>
>
> I understand you probably already know a lot about traditional RDBS’s.
> Much of my post is back ground for others
>
>
>
> I used to do some of classic relational database work before tools like
> Hadoop, spark and NoSQL became available .
>
>
>
> The standard operations on a single table in a relation database are
>
>
>
> Insert “row”. This is similar to spark union.  Typically primary keys in
>  in rbdms tables are indexed  to enable quick look up. So insert is
> probably not 1 for. 1 with union. The row may not simply be appended to the
> end of the table.
>
>
>
> Update a “row”
>
> Delete a “row”
>
> Select “rows where”
>
>
>
> Rdms server enable row and table level locking. Data must always be in a
> consistent state. You must commit or abort you changes for them to persist
> and to release locks on the data. Locks are required because you have a
> single resource and may user requesting service simultaneously. This is
> very different from Spark
>
>
>
> Storage and memory used to be really expensive so often people tried to
> create “1st normal form” schemas. I.E. no duplicate data to reduce
> hardware cost.  1st normal design require you to use joins to the get
> data table you want. Joins are expensive. Often design duplicated some data
> to improve performance by minimize the number of joins required. Duplicate
> data make maintaining consistency harder. There are other advantages to
> normalized data design and as we are all aware in the bigdata world lots of
> disadvantages. The dbms ran on a single big machine. Join was not
> implemented as distributed map/reduce.
>
>
>
> So My idea is use a traditional RDMS server: my final table will have 5
> million rows and 10,114 columns.
>
>1. Read the column vector from each of 10,114 data files
>2. insert the column vector as a row in the table
>   1. I read a file that has a single element on each line. All I need
>   to do is replace \n with ,
>3. Now I have table with 10,115 rows and 5 million columns
>4. The row id (primary key) is the original file name
>5. The columns are the row ids in the original column vectors
>6. Now all I need to do is pivot this single table to get what I want.
>This is the only join or map/reduce like operation
>7. A table with 5million rows and 10,114 columns
>
>
>
>
>
> My final table is about 220 gb. I know at google my I have quota for up 2
> mega mem machines. Each one has some think like 1.4 Tb of memory
>
>
>
> Kind regards
>
>
>
> Andy
>
>
>
>


Re: How is union() implemented? Need to implement column bind

2022-04-20 Thread Sean Owen
Wait, how is all that related to cbind -- very different from what's needed
to insert.
BigQuery is unrelated to MR or Spark. It is however a SQL engine, but, can
you express this in SQL without joins? I'm just guessing joining 10K+
tables is hard anywhere.

On Wed, Apr 20, 2022 at 7:32 PM Andrew Davidson  wrote:

> I was thinking about something like bigQuery a little more. I do not know
> how it is implemented. However I believe traditional relational databases
> are row oriented and typically run on single machine. You can lock at the
> row level. This leads me to speculate that row level inserts maybe more
> efficient that the way spark implements union. One way to create my uber
> matrix would be to read the column vectors from the  10,114 individual
> files and insert them as rows in a table, then pivot the table.  I am going
> to poke around a bit. For all I know bigQuery use map reduce like spark.
>
>
>
> Kind regards
>
>
>
> Andy
>
>
>
> *From: *Sean Owen 
> *Date: *Wednesday, April 20, 2022 at 2:31 PM
> *To: *Andrew Melo 
> *Cc: *Andrew Davidson , Bjørn Jørgensen <
> bjornjorgen...@gmail.com>, "user @spark" 
> *Subject: *Re: How is union() implemented? Need to implement column bind
>
>
>
> I don't think there's fundamental disapproval (it is implemented in
> sparklyr) just a question of how you make this work at scale in general.
> It's not a super natural operation in this context but can be done. If you
> find a successful solution at extremes then maybe it generalizes.
>
>
>
> On Wed, Apr 20, 2022 at 4:29 PM Andrew Melo  wrote:
>
> It would certainly be useful for our domain to have some sort of native
> cbind(). Is there a fundamental disapproval of adding that functionality,
> or is it just a matter of nobody implementing it?
>
>
>
> On Wed, Apr 20, 2022 at 16:28 Sean Owen  wrote:
>
> Good lead, pandas on Spark concat() is worth trying. It looks like it uses
> a join, but not 100% sure from the source.
>
> The SQL concat() function is indeed a different thing.
>
>
>
> On Wed, Apr 20, 2022 at 3:24 PM Bjørn Jørgensen 
> wrote:
>
> Sorry for asking. But why does`t concat work?
>
>
>
> Pandas on spark have ps.concat
> <https://github.com/apache/spark/blob/1cc2d1641c23f028b5f175f80a695891ff13a6e2/python/pyspark/pandas/namespace.py#L2299>
>  which
> takes 2 dataframes and concat them to 1 dataframe.
>
> It seems
> <https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.functions.concat.html#pyspark.sql.functions.concat>
> like the pyspark version takes 2 columns and concat it to one column.
>
>
>
> ons. 20. apr. 2022 kl. 21:04 skrev Sean Owen :
>
> cbind? yeah though the answer is typically a join. I don't know if there's
> a better option in a SQL engine, as SQL doesn't have anything to offer
> except join and pivot either (? right?)
>
> Certainly, the dominant data storage paradigm is wide tables, whereas
> you're starting with effectively a huge number of tiny slim tables, which
> is the impedance mismatch here.
>
>
>
> On Wed, Apr 20, 2022 at 1:51 PM Andrew Davidson  wrote:
>
> Thanks Sean
>
>
>
> I imagine this is a fairly common problem in data science. Any idea how
> other solve?  For example I wonder if running join something like BigQuery
> might work better? I do not know much about the implementation.
>
>
>
> No one tool will  solve all problems. Once I get the matrix I think it
> spark will work well for our need
>
>
>
> Kind regards
>
>
>
> Andy
>
>
>
> *From: *Sean Owen 
> *Date: *Monday, April 18, 2022 at 6:58 PM
> *To: *Andrew Davidson 
> *Cc: *"user @spark" 
> *Subject: *Re: How is union() implemented? Need to implement column bind
>
>
>
> A join is the natural answer, but this is a 10114-way join, which probably
> chokes readily just to even plan it, let alone all the shuffling and
> shuffling of huge data. You could tune your way out of it maybe, but not
> optimistic. It's just huge.
>
>
>
> You could go off-road and lower-level to take advantage of the structure
> of the data. You effectively want "column bind". There is no such operation
> in Spark. (union is 'row bind'.) You could do this with zipPartition, which
> is in the RDD API, and to my surprise, not in the Python API but exists in
> Scala. And R (!). If you can read several RDDs of data, you can use this
> method to pair all their corresponding values and ultimately get rows of
> 10114 values out. In fact that is how sparklyr implements cbind on Spark,
> FWIW: https://rdrr.io/cran/sparklyr/man/sdf_fast_bind_cols.html
>
>
>
> The issue I see is that you ca

Re: How is union() implemented? Need to implement column bind

2022-04-20 Thread Sean Owen
I don't think there's fundamental disapproval (it is implemented in
sparklyr) just a question of how you make this work at scale in general.
It's not a super natural operation in this context but can be done. If you
find a successful solution at extremes then maybe it generalizes.

On Wed, Apr 20, 2022 at 4:29 PM Andrew Melo  wrote:

> It would certainly be useful for our domain to have some sort of native
> cbind(). Is there a fundamental disapproval of adding that functionality,
> or is it just a matter of nobody implementing it?
>
> On Wed, Apr 20, 2022 at 16:28 Sean Owen  wrote:
>
>> Good lead, pandas on Spark concat() is worth trying. It looks like it
>> uses a join, but not 100% sure from the source.
>> The SQL concat() function is indeed a different thing.
>>
>> On Wed, Apr 20, 2022 at 3:24 PM Bjørn Jørgensen 
>> wrote:
>>
>>> Sorry for asking. But why does`t concat work?
>>>
>>> Pandas on spark have ps.concat
>>> <https://github.com/apache/spark/blob/1cc2d1641c23f028b5f175f80a695891ff13a6e2/python/pyspark/pandas/namespace.py#L2299>
>>>  which
>>> takes 2 dataframes and concat them to 1 dataframe.
>>> It seems
>>> <https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.functions.concat.html#pyspark.sql.functions.concat>
>>> like the pyspark version takes 2 columns and concat it to one column.
>>>
>>> ons. 20. apr. 2022 kl. 21:04 skrev Sean Owen :
>>>
>>>> cbind? yeah though the answer is typically a join. I don't know if
>>>> there's a better option in a SQL engine, as SQL doesn't have anything to
>>>> offer except join and pivot either (? right?)
>>>> Certainly, the dominant data storage paradigm is wide tables, whereas
>>>> you're starting with effectively a huge number of tiny slim tables, which
>>>> is the impedance mismatch here.
>>>>
>>>> On Wed, Apr 20, 2022 at 1:51 PM Andrew Davidson 
>>>> wrote:
>>>>
>>>>> Thanks Sean
>>>>>
>>>>>
>>>>>
>>>>> I imagine this is a fairly common problem in data science. Any idea
>>>>> how other solve?  For example I wonder if running join something like
>>>>> BigQuery might work better? I do not know much about the implementation.
>>>>>
>>>>>
>>>>>
>>>>> No one tool will  solve all problems. Once I get the matrix I think it
>>>>> spark will work well for our need
>>>>>
>>>>>
>>>>>
>>>>> Kind regards
>>>>>
>>>>>
>>>>>
>>>>> Andy
>>>>>
>>>>>
>>>>>
>>>>> *From: *Sean Owen 
>>>>> *Date: *Monday, April 18, 2022 at 6:58 PM
>>>>> *To: *Andrew Davidson 
>>>>> *Cc: *"user @spark" 
>>>>> *Subject: *Re: How is union() implemented? Need to implement column
>>>>> bind
>>>>>
>>>>>
>>>>>
>>>>> A join is the natural answer, but this is a 10114-way join, which
>>>>> probably chokes readily just to even plan it, let alone all the shuffling
>>>>> and shuffling of huge data. You could tune your way out of it maybe, but
>>>>> not optimistic. It's just huge.
>>>>>
>>>>>
>>>>>
>>>>> You could go off-road and lower-level to take advantage of the
>>>>> structure of the data. You effectively want "column bind". There is no 
>>>>> such
>>>>> operation in Spark. (union is 'row bind'.) You could do this with
>>>>> zipPartition, which is in the RDD API, and to my surprise, not in the
>>>>> Python API but exists in Scala. And R (!). If you can read several RDDs of
>>>>> data, you can use this method to pair all their corresponding values and
>>>>> ultimately get rows of 10114 values out. In fact that is how sparklyr
>>>>> implements cbind on Spark, FWIW:
>>>>> https://rdrr.io/cran/sparklyr/man/sdf_fast_bind_cols.html
>>>>>
>>>>>
>>>>>
>>>>> The issue I see is that you can only zip a few at a time; you don't
>>>>> want to zip 10114 of them. Perhaps you have to do that iteratively, and I
>>>>> don't know if that is going to face the same issues with huge huge plans.
>>>>>
>>>>>
>>>>>
>>>>> I 

Re: How is union() implemented? Need to implement column bind

2022-04-20 Thread Sean Owen
Good lead, pandas on Spark concat() is worth trying. It looks like it uses
a join, but not 100% sure from the source.
The SQL concat() function is indeed a different thing.

On Wed, Apr 20, 2022 at 3:24 PM Bjørn Jørgensen 
wrote:

> Sorry for asking. But why does`t concat work?
>
> Pandas on spark have ps.concat
> <https://github.com/apache/spark/blob/1cc2d1641c23f028b5f175f80a695891ff13a6e2/python/pyspark/pandas/namespace.py#L2299>
>  which
> takes 2 dataframes and concat them to 1 dataframe.
> It seems
> <https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.functions.concat.html#pyspark.sql.functions.concat>
> like the pyspark version takes 2 columns and concat it to one column.
>
> ons. 20. apr. 2022 kl. 21:04 skrev Sean Owen :
>
>> cbind? yeah though the answer is typically a join. I don't know if
>> there's a better option in a SQL engine, as SQL doesn't have anything to
>> offer except join and pivot either (? right?)
>> Certainly, the dominant data storage paradigm is wide tables, whereas
>> you're starting with effectively a huge number of tiny slim tables, which
>> is the impedance mismatch here.
>>
>> On Wed, Apr 20, 2022 at 1:51 PM Andrew Davidson 
>> wrote:
>>
>>> Thanks Sean
>>>
>>>
>>>
>>> I imagine this is a fairly common problem in data science. Any idea how
>>> other solve?  For example I wonder if running join something like BigQuery
>>> might work better? I do not know much about the implementation.
>>>
>>>
>>>
>>> No one tool will  solve all problems. Once I get the matrix I think it
>>> spark will work well for our need
>>>
>>>
>>>
>>> Kind regards
>>>
>>>
>>>
>>> Andy
>>>
>>>
>>>
>>> *From: *Sean Owen 
>>> *Date: *Monday, April 18, 2022 at 6:58 PM
>>> *To: *Andrew Davidson 
>>> *Cc: *"user @spark" 
>>> *Subject: *Re: How is union() implemented? Need to implement column bind
>>>
>>>
>>>
>>> A join is the natural answer, but this is a 10114-way join, which
>>> probably chokes readily just to even plan it, let alone all the shuffling
>>> and shuffling of huge data. You could tune your way out of it maybe, but
>>> not optimistic. It's just huge.
>>>
>>>
>>>
>>> You could go off-road and lower-level to take advantage of the structure
>>> of the data. You effectively want "column bind". There is no such operation
>>> in Spark. (union is 'row bind'.) You could do this with zipPartition, which
>>> is in the RDD API, and to my surprise, not in the Python API but exists in
>>> Scala. And R (!). If you can read several RDDs of data, you can use this
>>> method to pair all their corresponding values and ultimately get rows of
>>> 10114 values out. In fact that is how sparklyr implements cbind on Spark,
>>> FWIW: https://rdrr.io/cran/sparklyr/man/sdf_fast_bind_cols.html
>>>
>>>
>>>
>>> The issue I see is that you can only zip a few at a time; you don't want
>>> to zip 10114 of them. Perhaps you have to do that iteratively, and I don't
>>> know if that is going to face the same issues with huge huge plans.
>>>
>>>
>>>
>>> I like the pivot idea. If you can read the individual files as data rows
>>> (maybe list all the file names, parallelize with Spark, write a UDF that
>>> reads the data for that file to generate the rows). If you can emit (file,
>>> index, value) and groupBy index, pivot on file (I think?) that should be
>>> about it? I think it doesn't need additional hashing or whatever. Not sure
>>> how fast it is but that seems more direct than the join, as well.
>>>
>>>
>>>
>>> On Mon, Apr 18, 2022 at 8:27 PM Andrew Davidson
>>>  wrote:
>>>
>>> Hi have a hard problem
>>>
>>>
>>>
>>> I have  10114 column vectors each in a separate file. The file has 2
>>> columns, the row id, and numeric values. The row ids are identical and in
>>> sort order. All the column vectors have the same number of rows. There are
>>> over 5 million rows.  I need to combine them into a single table. The row
>>> ids are very long strings. The column names are about 20 chars long.
>>>
>>>
>>>
>>> My current implementation uses join. This takes a long time on a
>>> cluster with 2 works totaling 192 vcpu and 2.8 tb of memory. It often
>>> crashes. I 

Re: How is union() implemented? Need to implement column bind

2022-04-20 Thread Sean Owen
cbind? yeah though the answer is typically a join. I don't know if there's
a better option in a SQL engine, as SQL doesn't have anything to offer
except join and pivot either (? right?)
Certainly, the dominant data storage paradigm is wide tables, whereas
you're starting with effectively a huge number of tiny slim tables, which
is the impedance mismatch here.

On Wed, Apr 20, 2022 at 1:51 PM Andrew Davidson  wrote:

> Thanks Sean
>
>
>
> I imagine this is a fairly common problem in data science. Any idea how
> other solve?  For example I wonder if running join something like BigQuery
> might work better? I do not know much about the implementation.
>
>
>
> No one tool will  solve all problems. Once I get the matrix I think it
> spark will work well for our need
>
>
>
> Kind regards
>
>
>
> Andy
>
>
>
> *From: *Sean Owen 
> *Date: *Monday, April 18, 2022 at 6:58 PM
> *To: *Andrew Davidson 
> *Cc: *"user @spark" 
> *Subject: *Re: How is union() implemented? Need to implement column bind
>
>
>
> A join is the natural answer, but this is a 10114-way join, which probably
> chokes readily just to even plan it, let alone all the shuffling and
> shuffling of huge data. You could tune your way out of it maybe, but not
> optimistic. It's just huge.
>
>
>
> You could go off-road and lower-level to take advantage of the structure
> of the data. You effectively want "column bind". There is no such operation
> in Spark. (union is 'row bind'.) You could do this with zipPartition, which
> is in the RDD API, and to my surprise, not in the Python API but exists in
> Scala. And R (!). If you can read several RDDs of data, you can use this
> method to pair all their corresponding values and ultimately get rows of
> 10114 values out. In fact that is how sparklyr implements cbind on Spark,
> FWIW: https://rdrr.io/cran/sparklyr/man/sdf_fast_bind_cols.html
>
>
>
> The issue I see is that you can only zip a few at a time; you don't want
> to zip 10114 of them. Perhaps you have to do that iteratively, and I don't
> know if that is going to face the same issues with huge huge plans.
>
>
>
> I like the pivot idea. If you can read the individual files as data rows
> (maybe list all the file names, parallelize with Spark, write a UDF that
> reads the data for that file to generate the rows). If you can emit (file,
> index, value) and groupBy index, pivot on file (I think?) that should be
> about it? I think it doesn't need additional hashing or whatever. Not sure
> how fast it is but that seems more direct than the join, as well.
>
>
>
> On Mon, Apr 18, 2022 at 8:27 PM Andrew Davidson 
> wrote:
>
> Hi have a hard problem
>
>
>
> I have  10114 column vectors each in a separate file. The file has 2
> columns, the row id, and numeric values. The row ids are identical and in
> sort order. All the column vectors have the same number of rows. There are
> over 5 million rows.  I need to combine them into a single table. The row
> ids are very long strings. The column names are about 20 chars long.
>
>
>
> My current implementation uses join. This takes a long time on a cluster
> with 2 works totaling 192 vcpu and 2.8 tb of memory. It often crashes. I
> mean totally dead start over. Checkpoints do not seem  help, It still
> crashes and need to be restarted from scratch. What is really surprising
> is the final file size is only 213G ! The way got the file  was to copy
> all the column vectors to a single BIG IRON machine and used unix cut and
> paste. Took about 44 min to run once I got all the data moved around. It
> was very tedious and error prone. I had to move a lot data around. Not a
> particularly reproducible process. I will need to rerun this three more
> times on different data sets of about the same size
>
>
>
> I noticed that spark has a union function(). It implements row bind. Any
> idea how it is implemented? Is it just map reduce under the covers?
>
>
>
> My thought was
>
> 1.  load each col vector
>
> 2.  maybe I need to replace the really long row id strings with
> integers
>
> 3.  convert column vectors into row vectors using piviot (Ie matrix
> transpose.)
>
> 4.  union all the row vectors into a single table
>
> 5.  piviot the table back so I have the correct column vectors
>
>
>
> I could replace the row ids and column name with integers if needed, and
> restore them later
>
>
>
> Maybe I would be better off using many small machines? I assume memory is
> the limiting resource not cpu. I notice that memory usage will reach 100%.
> I added several TB’s of local ssd. I am not convinced that spark is using
>

Re: Grouping and counting occurences of specific column rows

2022-04-19 Thread Sean Owen
Just .groupBy(...).count() ?

On Tue, Apr 19, 2022 at 6:24 AM marc nicole  wrote:

> Hello guys,
>
> I want to group by certain column attributes (e.g.,List
> groupByQidAttributes) a dataset (initDataset) and then count the
> occurrences of associated grouped rows, how do i achieve that neatly?
> I tried through the following code:
> Dataset groupedRowsDF = initDataset.withColumn("qidsFreqs", count("*"
> ).over(Window.partitionBy(groupByQidAttributes.toArray(new Column[
> groupByQidAttributes.size()]; Is that OK to use for the purpose?
>
>


Re: RDD memory use question

2022-04-19 Thread Sean Owen
Don't collect() - that pulls all data into memory. Use count().

On Tue, Apr 19, 2022 at 5:34 AM wilson  wrote:

> Hello,
>
> Do you know for a big dataset why the general RDD job can be done, but
> the collect() failed due to memory overflow?
>
> for instance, for a dataset which has xxx million of items, this can be
> done well:
>
>   scala> rdd.map { x => x.split(",") }.map{ x => (x(5).toString,
> x(6).toDouble) }.groupByKey.mapValues(x =>
> x.sum/x.size).sortBy(-_._2).take(20)
>
>
> But in the final stage I issued this command and it got:
>
> scala> rdd.collect.size
> 22/04/19 18:26:52 ERROR Executor: Exception in task 13.0 in stage 44.0
> (TID 349)
> java.lang.OutOfMemoryError: Java heap space
>
>
> Thank you.
> wilson
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: How is union() implemented? Need to implement column bind

2022-04-18 Thread Sean Owen
A join is the natural answer, but this is a 10114-way join, which probably
chokes readily just to even plan it, let alone all the shuffling and
shuffling of huge data. You could tune your way out of it maybe, but not
optimistic. It's just huge.

You could go off-road and lower-level to take advantage of the structure of
the data. You effectively want "column bind". There is no such operation in
Spark. (union is 'row bind'.) You could do this with zipPartition, which is
in the RDD API, and to my surprise, not in the Python API but exists in
Scala. And R (!). If you can read several RDDs of data, you can use this
method to pair all their corresponding values and ultimately get rows of
10114 values out. In fact that is how sparklyr implements cbind on Spark,
FWIW: https://rdrr.io/cran/sparklyr/man/sdf_fast_bind_cols.html

The issue I see is that you can only zip a few at a time; you don't want to
zip 10114 of them. Perhaps you have to do that iteratively, and I don't
know if that is going to face the same issues with huge huge plans.

I like the pivot idea. If you can read the individual files as data rows
(maybe list all the file names, parallelize with Spark, write a UDF that
reads the data for that file to generate the rows). If you can emit (file,
index, value) and groupBy index, pivot on file (I think?) that should be
about it? I think it doesn't need additional hashing or whatever. Not sure
how fast it is but that seems more direct than the join, as well.

On Mon, Apr 18, 2022 at 8:27 PM Andrew Davidson 
wrote:

> Hi have a hard problem
>
>
>
> I have  10114 column vectors each in a separate file. The file has 2
> columns, the row id, and numeric values. The row ids are identical and in
> sort order. All the column vectors have the same number of rows. There are
> over 5 million rows.  I need to combine them into a single table. The row
> ids are very long strings. The column names are about 20 chars long.
>
>
>
> My current implementation uses join. This takes a long time on a cluster
> with 2 works totaling 192 vcpu and 2.8 tb of memory. It often crashes. I
> mean totally dead start over. Checkpoints do not seem  help, It still
> crashes and need to be restarted from scratch. What is really surprising
> is the final file size is only 213G ! The way got the file  was to copy
> all the column vectors to a single BIG IRON machine and used unix cut and
> paste. Took about 44 min to run once I got all the data moved around. It
> was very tedious and error prone. I had to move a lot data around. Not a
> particularly reproducible process. I will need to rerun this three more
> times on different data sets of about the same size
>
>
>
> I noticed that spark has a union function(). It implements row bind. Any
> idea how it is implemented? Is it just map reduce under the covers?
>
>
>
> My thought was
>
>1. load each col vector
>2. maybe I need to replace the really long row id strings with integers
>3. convert column vectors into row vectors using piviot (Ie matrix
>transpose.)
>4. union all the row vectors into a single table
>5. piviot the table back so I have the correct column vectors
>
>
>
> I could replace the row ids and column name with integers if needed, and
> restore them later
>
>
>
> Maybe I would be better off using many small machines? I assume memory is
> the limiting resource not cpu. I notice that memory usage will reach 100%.
> I added several TB’s of local ssd. I am not convinced that spark is using
> the local disk
>
>
>
>
>
> will this perform better than join?
>
>
>
>- The rows  before the final pivot will be very very wide (over 5
>million columns)
>- There will only be 10114 rows before the pivot
>
>
>
> I assume the pivots will shuffle all the data. I assume the Colum vectors
> are trivial. The file table pivot will be expensive however will only need
> to be done once
>
>
>
>
>
>
>
> Comments and suggestions appreciated
>
>
>
> Andy
>
>
>
>
>


Re: [Spark Streaming] [Debug] Memory error when using NER model in Python

2022-04-18 Thread Sean Owen
It looks good, are you sure it even starts? the problem I see is that you
send a copy of the model from the driver for every task. Try broadcasting
the model instead. I'm not sure if that resolves it but would be a good
practice.

On Mon, Apr 18, 2022 at 9:10 AM Xavier Gervilla 
wrote:

> Hi Team,
> 
>
> I'm developing a project that retrieves tweets on a 'host' app, streams
> them to Spark and with different operations with DataFrames obtains the
> Sentiment of the tweets and their entities applying a Sentiment model and a
> NER model respectively.
>
> The problem I've come across is that when applying the NER model, the RAM
> consumption increases until the program stops with a memory error because
> there's no memory left to execute. In addition, on SparkUI I've seen that
> there's only one executor running, the executor driver, but using htop on
> the terminal I see that the 8 cores of the instance are executing at 100%.
>
> The SparkSession is only configured to receive the tweets from the socket
> that connects with the second program that sends the tweets. The DataFrame
> goes through some processing to obtain other properties of the tweet like
> its sentiment (which causes no error even with less than 8GB of RAM) and
> then the NER is applied.
>
> *spark = SparkSession.builder.appName(**"TwitterStreamApp"**).getOrCreate()
> rawTweets = spark.readStream.**format**(**"socket"**).option(**"host"**, 
> **"localhost"**).option(**"port"**,**9008**).load()
> tweets = rawTweets.selectExpr(**"CAST(value AS STRING)"**)
>
> **#prior processing of the tweets**
> sentDF = other_processing(tweets)
>
> **#obtaining the column that contains the list of entities from a tweet**
> nerDF = ner_classification(sentDF)*
>
>
> This is the code of the functions related to obtaining the NER, the "main
> call" and the UDF function.
>
> *nerModel = spacy.load(**"en_core_web_sm"**)
>
> **#main call, applies the UDF function to every tweet from the "tweet" 
> column**def* *ner_classification**(**words**):
> ner_list = udf(obtain_ner_udf, ArrayType(StringType()))
> words = words.withColumn(**"nerlist"**, ner_list(**"tweet"**))
> **return** words
>
> **#udf function**def* *obtain_ner_udf**(**words**):
> **#if the tweet is empty return None*
> *if** words == **""**:
> **return* *None*
> *#else: applying the NER model (Spacy en_core_web_sm)**
> entities = nerModel(words)
>
> **#returns a list of the form ['entity1_label1', 'entity2_label2',...]*
> *return** [ word.text + **'_'** + word.label_ **for** word **in** 
> entities.ents ]*
>
>
>
> And lastly I map each entity with the sentiment from its tweet and obtain
> the average sentiment of the entity and the number of appearances.
>
> *flattenedNER = nerDF.select(nerDF.sentiment, explode(nerDF.nerlist))
> flattenedNER.registerTempTable(**"df"**)
>
>
> querySelect = **"SELECT col as entity, avg(sentiment) as sentiment, 
> count(col) as count FROM df GROUP BY col"**
> finalDF = spark.sql(querySelect)
>
> query = 
> finalDF.writeStream.foreachBatch(processBatch).outputMode(**"complete"**).start()*
>
>
> The resulting DF is processed with a function that separates each column
> in a list and prints it.
>
> *def* *processBatch**(**df**,* *epoch_id**):**entities* *=* 
> *[**str**(**t**.**entity**)* *for* *t* *in* 
> *df**.**select**(**"entity"**).**collect**()]*
> *sentiments* *=* *[**float**(**t**.**sentiment**)* *for* *t* *in* 
> *df**.**select**(**"sentiment"**).**collect**()]*
> *counts* *=* *[**int**(**row**.**asDict**()[**'count'**])* *for* *row* 
> *in* *df**.**select**(**"count"**).**collect**()]*
>
> *print(**entities**,* *sentiments**,* *counts**)*
>
>
> At first I tried with other NER models from Flair they have the same
> effect, after printing the first batch memory use starts increasing until
> it fails and stops the execution because of the memory error. When applying
> a "simple" function instead of the NER model, such as *return
> words.split()* on the UDF there's no such error so the data ingested
> should not be what's causing the overload but the model.
>
> Is there a way to prevent the excessive RAM consumption? Why is there only
> the driver executor and no other executors are generated? How could I
> prevent it from collapsing when applying the NER model?
>
> Thanks in advance!
>
>


Re: cannot access class sun.nio.ch.DirectBuffer

2022-04-13 Thread Sean Owen
It is not officially supported, yes. Try Spark 3.3 from the branch if you
want to try Java 17

On Wed, Apr 13, 2022, 9:36 PM Arunachalam Sibisakkaravarthi <
arunacha...@mcruncher.com> wrote:

> Thanks everyone for giving your feedback.
> Jvm option "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED" resolved the
> issue "cannot access class sun.nio.ch.DirectBuffer"
> But still Spark throws some other exception
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
> in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage
> 0.0 (TID 0) (ldap executor driver): java.io.InvalidObjectException:
> ReflectiveOperationException during deserialization
> at java.base/java.lang.invoke.SerializedLambda.readResolve(Unknown Source)
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown
> Source)
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
> Source)
>
> Caused by: java.lang.reflect.InvocationTargetException: null
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown
> Source)
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
> Source)
> at java.base/java.lang.reflect.Method.invoke(Unknown Source)
> ... 86 common frames omitted
>
> Caused by: java.lang.IllegalArgumentException: too many arguments
> at java.base/java.lang.invoke.LambdaMetafactory.altMetafactory(Unknown
> Source)
> at
> scala.runtime.LambdaDeserializer$.makeCallSite$1(LambdaDeserializer.scala:105)
> at
> scala.runtime.LambdaDeserializer$.deserializeLambda(LambdaDeserializer.scala:114)
> at
> scala.runtime.LambdaDeserialize.deserializeLambda(LambdaDeserialize.java:38)
>
> Maybe we need to change the subject to say "spark-sql_2.12 doesn't work
> with jdk17 " or should I open another discussion?
>
>
>
>
>
>
>
>
>
> *Thanks And RegardsSibi.ArunachalammCruncher*
>
>
> On Wed, Apr 13, 2022 at 10:16 PM Sean Owen  wrote:
>
>> Yes I think that's a change that has caused difficulties, but, these
>> internal APIs were always discouraged. Hey, one is even called 'unsafe'.
>> There is an escape hatch, the JVM arg below.
>>
>> On Wed, Apr 13, 2022, 9:09 AM Andrew Melo  wrote:
>>
>>> Gotcha. Seeing as there's a lot of large projects who used the unsafe
>>> API either directly or indirectly (via netty, etc..) it's a bit surprising
>>> that it was so thoroughly closed off without an escape hatch, but I'm sure
>>> there was a lively discussion around it...
>>>
>>> Cheers
>>> Andrew
>>>
>>> On Wed, Apr 13, 2022 at 09:07 Sean Owen  wrote:
>>>
>>>> It is intentionally closed by the JVM going forward, as direct access
>>>> is discouraged. But it's still necessary for Spark. In some cases, like
>>>> direct mem access, there is a new API but it's in Java 17 I think, and we
>>>> can't assume Java 17 any time soon.
>>>>
>>>> On Wed, Apr 13, 2022 at 9:05 AM Andrew Melo 
>>>> wrote:
>>>>
>>>>> Hi Sean,
>>>>>
>>>>> Out of curiosity, will Java 11+ always require special flags to access
>>>>> the unsafe direct memory interfaces, or is this something that will either
>>>>> be addressed by the spec (by making an "approved" interface) or by
>>>>> libraries (with some other workaround)?
>>>>>
>>>>> Thanks
>>>>> Andrew
>>>>>
>>>>> On Tue, Apr 12, 2022 at 08:45 Sean Owen  wrote:
>>>>>
>>>>>> In Java 11+, you will need to tell the JVM to allow access to
>>>>>> internal packages in some cases, for any JVM application. You will need
>>>>>> flags like "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED", which you
>>>>>> can see in the pom.xml file for the project.
>>>>>>
>>>>>> Spark 3.2 does not necessarily work with Java 17 (3.3 should have
>>>>>> support), but it may well work after you address those flags.
>>>>>>
>>>>>> On Tue, Apr 12, 2022 at 7:05 AM Arunachalam Sibisakkaravarthi <
>>>>>> arunacha...@mcruncher.com> wrote:
>>>>>>
>>>>>>> Hi guys,
>>>>>>>
>>>>>>> spark-sql_2.12:3.2.1 is used in our appl

Re: cannot access class sun.nio.ch.DirectBuffer

2022-04-13 Thread Sean Owen
Yes I think that's a change that has caused difficulties, but, these
internal APIs were always discouraged. Hey, one is even called 'unsafe'.
There is an escape hatch, the JVM arg below.

On Wed, Apr 13, 2022, 9:09 AM Andrew Melo  wrote:

> Gotcha. Seeing as there's a lot of large projects who used the unsafe API
> either directly or indirectly (via netty, etc..) it's a bit surprising that
> it was so thoroughly closed off without an escape hatch, but I'm sure there
> was a lively discussion around it...
>
> Cheers
> Andrew
>
> On Wed, Apr 13, 2022 at 09:07 Sean Owen  wrote:
>
>> It is intentionally closed by the JVM going forward, as direct access is
>> discouraged. But it's still necessary for Spark. In some cases, like direct
>> mem access, there is a new API but it's in Java 17 I think, and we can't
>> assume Java 17 any time soon.
>>
>> On Wed, Apr 13, 2022 at 9:05 AM Andrew Melo 
>> wrote:
>>
>>> Hi Sean,
>>>
>>> Out of curiosity, will Java 11+ always require special flags to access
>>> the unsafe direct memory interfaces, or is this something that will either
>>> be addressed by the spec (by making an "approved" interface) or by
>>> libraries (with some other workaround)?
>>>
>>> Thanks
>>> Andrew
>>>
>>> On Tue, Apr 12, 2022 at 08:45 Sean Owen  wrote:
>>>
>>>> In Java 11+, you will need to tell the JVM to allow access to internal
>>>> packages in some cases, for any JVM application. You will need flags like
>>>> "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED", which you can see in
>>>> the pom.xml file for the project.
>>>>
>>>> Spark 3.2 does not necessarily work with Java 17 (3.3 should have
>>>> support), but it may well work after you address those flags.
>>>>
>>>> On Tue, Apr 12, 2022 at 7:05 AM Arunachalam Sibisakkaravarthi <
>>>> arunacha...@mcruncher.com> wrote:
>>>>
>>>>> Hi guys,
>>>>>
>>>>> spark-sql_2.12:3.2.1 is used in our application.
>>>>>
>>>>> It throws following exceptions when the app runs using JRE17
>>>>>
>>>>> java.lang.IllegalAccessError: class 
>>>>> org.apache.spark.storage.StorageUtils$ (in unnamed module @0x451f1bd4) 
>>>>> cannot access class sun.nio.ch.DirectBuffer (in module java.base) because 
>>>>> module java.base does not export sun.nio.ch to unnamed module 
>>>>> @0x451f1bd43   at 
>>>>> org.apache.spark.storage.StorageUtils$.(StorageUtils.scala:213)4
>>>>>at 
>>>>> org.apache.spark.storage.StorageUtils$.(StorageUtils.scala)5 at 
>>>>> org.apache.spark.storage.BlockManagerMasterEndpoint.(BlockManagerMasterEndpoint.scala:110)6
>>>>> at org.apache.spark.SparkEnv$.$anonfun$create$9(SparkEnv.scala:348)7  
>>>>>   at 
>>>>> org.apache.spark.SparkEnv$.registerOrLookupEndpoint$1(SparkEnv.scala:287)8
>>>>>at org.apache.spark.SparkEnv$.create(SparkEnv.scala:336)9   at 
>>>>> org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:191)10 at 
>>>>> org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:277)11
>>>>>at org.apache.spark.SparkContext.(SparkContext.scala:460)12  
>>>>>  at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2690)13 
>>>>>at 
>>>>> org.apache.spark.sql.SparkSession$Builder.$anonfun$getOrCreate$2(SparkSession.scala:949)14
>>>>>at scala.Option.getOrElse(Option.scala:189)15   at 
>>>>> org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:943)
>>>>>
>>>>> How do we fix this?
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> *Thanks And RegardsSibi.ArunachalammCruncher*
>>>>>
>>>> --
>>> It's dark in this basement.
>>>
>> --
> It's dark in this basement.
>


Re: cannot access class sun.nio.ch.DirectBuffer

2022-04-13 Thread Sean Owen
It is intentionally closed by the JVM going forward, as direct access is
discouraged. But it's still necessary for Spark. In some cases, like direct
mem access, there is a new API but it's in Java 17 I think, and we can't
assume Java 17 any time soon.

On Wed, Apr 13, 2022 at 9:05 AM Andrew Melo  wrote:

> Hi Sean,
>
> Out of curiosity, will Java 11+ always require special flags to access the
> unsafe direct memory interfaces, or is this something that will either be
> addressed by the spec (by making an "approved" interface) or by libraries
> (with some other workaround)?
>
> Thanks
> Andrew
>
> On Tue, Apr 12, 2022 at 08:45 Sean Owen  wrote:
>
>> In Java 11+, you will need to tell the JVM to allow access to internal
>> packages in some cases, for any JVM application. You will need flags like
>> "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED", which you can see in the
>> pom.xml file for the project.
>>
>> Spark 3.2 does not necessarily work with Java 17 (3.3 should have
>> support), but it may well work after you address those flags.
>>
>> On Tue, Apr 12, 2022 at 7:05 AM Arunachalam Sibisakkaravarthi <
>> arunacha...@mcruncher.com> wrote:
>>
>>> Hi guys,
>>>
>>> spark-sql_2.12:3.2.1 is used in our application.
>>>
>>> It throws following exceptions when the app runs using JRE17
>>>
>>> java.lang.IllegalAccessError: class org.apache.spark.storage.StorageUtils$ 
>>> (in unnamed module @0x451f1bd4) cannot access class sun.nio.ch.DirectBuffer 
>>> (in module java.base) because module java.base does not export sun.nio.ch 
>>> to unnamed module @0x451f1bd43 at 
>>> org.apache.spark.storage.StorageUtils$.(StorageUtils.scala:213)4  
>>>  at org.apache.spark.storage.StorageUtils$.(StorageUtils.scala)5 at 
>>> org.apache.spark.storage.BlockManagerMasterEndpoint.(BlockManagerMasterEndpoint.scala:110)6
>>> at org.apache.spark.SparkEnv$.$anonfun$create$9(SparkEnv.scala:348)7
>>> at 
>>> org.apache.spark.SparkEnv$.registerOrLookupEndpoint$1(SparkEnv.scala:287)8  
>>>  at org.apache.spark.SparkEnv$.create(SparkEnv.scala:336)9   at 
>>> org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:191)10 at 
>>> org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:277)11  
>>>  at org.apache.spark.SparkContext.(SparkContext.scala:460)12   at 
>>> org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2690)13   
>>>  at 
>>> org.apache.spark.sql.SparkSession$Builder.$anonfun$getOrCreate$2(SparkSession.scala:949)14
>>>at scala.Option.getOrElse(Option.scala:189)15   at 
>>> org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:943)
>>>
>>> How do we fix this?
>>>
>>>
>>>
>>>
>>> *Thanks And RegardsSibi.ArunachalammCruncher*
>>>
>> --
> It's dark in this basement.
>


Re: cannot access class sun.nio.ch.DirectBuffer

2022-04-12 Thread Sean Owen
In Java 11+, you will need to tell the JVM to allow access to internal
packages in some cases, for any JVM application. You will need flags like
"--add-opens=java.base/sun.nio.ch=ALL-UNNAMED", which you can see in the
pom.xml file for the project.

Spark 3.2 does not necessarily work with Java 17 (3.3 should have support),
but it may well work after you address those flags.

On Tue, Apr 12, 2022 at 7:05 AM Arunachalam Sibisakkaravarthi <
arunacha...@mcruncher.com> wrote:

> Hi guys,
>
> spark-sql_2.12:3.2.1 is used in our application.
>
> It throws following exceptions when the app runs using JRE17
>
> java.lang.IllegalAccessError: class org.apache.spark.storage.StorageUtils$ 
> (in unnamed module @0x451f1bd4) cannot access class sun.nio.ch.DirectBuffer 
> (in module java.base) because module java.base does not export sun.nio.ch to 
> unnamed module @0x451f1bd43   at 
> org.apache.spark.storage.StorageUtils$.(StorageUtils.scala:213)4   
> at org.apache.spark.storage.StorageUtils$.(StorageUtils.scala)5 at 
> org.apache.spark.storage.BlockManagerMasterEndpoint.(BlockManagerMasterEndpoint.scala:110)6
> at org.apache.spark.SparkEnv$.$anonfun$create$9(SparkEnv.scala:348)7
> at org.apache.spark.SparkEnv$.registerOrLookupEndpoint$1(SparkEnv.scala:287)8 
>   at org.apache.spark.SparkEnv$.create(SparkEnv.scala:336)9   at 
> org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:191)10 at 
> org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:277)11   
> at org.apache.spark.SparkContext.(SparkContext.scala:460)12   at 
> org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2690)13
> at 
> org.apache.spark.sql.SparkSession$Builder.$anonfun$getOrCreate$2(SparkSession.scala:949)14
>at scala.Option.getOrElse(Option.scala:189)15   at 
> org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:943)
>
> How do we fix this?
>
>
>
>
> *Thanks And RegardsSibi.ArunachalammCruncher*
>


Re: Spark Write BinaryType Column as continues file to S3

2022-04-08 Thread Sean Owen
That's for strings, but still doesn't address what is desired w.r.t.
writing a binary column

On Fri, Apr 8, 2022 at 10:31 AM Bjørn Jørgensen 
wrote:

> In the New spark 3.3 there Will be an sql function
> https://github.com/apache/spark/commit/25dd4254fed71923731fd59838875c0dd1ff665a
> hope this can help you.
>
> fre. 8. apr. 2022, 17:14 skrev Philipp Kraus <
> philipp.kraus.flashp...@gmail.com>:
>
>> Hello,
>>
>> I have got a data frame with numerical data in Spark 3.1.1 (Java) which
>> should be converted to a binary file.
>> My idea is that I create a udf function that generates a byte array based
>> on the numerical values, so I can apply this function on each row of the
>> data frame and get than a new column with row-wise binary byte data.
>> If this is done, I would like to write this column as continues byte
>> stream to a file which is stored in a S3 bucket.
>>
>> So my question is, is the idea with the udf function a good idea and is
>> it possible to write this continues byte stream directly to S3 / is there
>> any built-in functionality?
>> What is a good strategy to do this?
>>
>> Thanks for help
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


Re: Spark Write BinaryType Column as continues file to S3

2022-04-08 Thread Sean Owen
You can certainly write that UDF. You get a column in a DataFrame of
array type and you can write that to any appropriate format. What do
you mean by continuous byte stream? something besides, say, parquet files
holding the byte arrays?

On Fri, Apr 8, 2022 at 10:14 AM Philipp Kraus <
philipp.kraus.flashp...@gmail.com> wrote:

> Hello,
>
> I have got a data frame with numerical data in Spark 3.1.1 (Java) which
> should be converted to a binary file.
> My idea is that I create a udf function that generates a byte array based
> on the numerical values, so I can apply this function on each row of the
> data frame and get than a new column with row-wise binary byte data.
> If this is done, I would like to write this column as continues byte
> stream to a file which is stored in a S3 bucket.
>
> So my question is, is the idea with the udf function a good idea and is it
> possible to write this continues byte stream directly to S3 / is there any
> built-in functionality?
> What is a good strategy to do this?
>
> Thanks for help
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Aggregate over a column: the proper way to do

2022-04-08 Thread Sean Owen
Dataset.count() returns one value directly?

On Thu, Apr 7, 2022 at 11:25 PM sam smith 
wrote:

> My bad, yes of course that! still i don't like the ..
> select("count(myCol)") .. part in my line is there any replacement to that ?
>
> Le ven. 8 avr. 2022 à 06:13, Sean Owen  a écrit :
>
>> Just do an average then? Most of my point is that filtering to one group
>> and then grouping is pointless.
>>
>> On Thu, Apr 7, 2022, 11:10 PM sam smith 
>> wrote:
>>
>>> What if i do avg instead of count?
>>>
>>> Le ven. 8 avr. 2022 à 05:32, Sean Owen  a écrit :
>>>
>>>> Wait, why groupBy at all? After the filter only rows with myCol equal
>>>> to your target are left. There is only one group. Don't group just count
>>>> after the filter?
>>>>
>>>> On Thu, Apr 7, 2022, 10:27 PM sam smith 
>>>> wrote:
>>>>
>>>>> I want to aggregate a column by counting the number of rows having the
>>>>> value "myTargetValue" and return the result
>>>>> I am doing it like the following:in JAVA
>>>>>
>>>>>> long result =
>>>>>> dataset.filter(dataset.col("myCol").equalTo("myTargetVal")).groupBy(col("myCol")).agg(count(dataset.col("myCol"))).select("count(myCol)").first().getLong(0);
>>>>>
>>>>>
>>>>> Is that the right way? if no, what if a more optimized way to do that
>>>>> (always in JAVA)?
>>>>> Thanks for the help.
>>>>>
>>>>


Re: Aggregate over a column: the proper way to do

2022-04-07 Thread Sean Owen
Wait, why groupBy at all? After the filter only rows with myCol equal to
your target are left. There is only one group. Don't group just count after
the filter?

On Thu, Apr 7, 2022, 10:27 PM sam smith  wrote:

> I want to aggregate a column by counting the number of rows having the
> value "myTargetValue" and return the result
> I am doing it like the following:in JAVA
>
>> long result =
>> dataset.filter(dataset.col("myCol").equalTo("myTargetVal")).groupBy(col("myCol")).agg(count(dataset.col("myCol"))).select("count(myCol)").first().getLong(0);
>
>
> Is that the right way? if no, what if a more optimized way to do that
> (always in JAVA)?
> Thanks for the help.
>


Re: Spark 3.0.1 and spark 3.2 compatibility

2022-04-07 Thread Sean Owen
(Don't cross post please)
Generally you definitely want to compile and test vs what you're running on.
There shouldn't be many binary or source incompatibilities -- these are
avoided in a major release where possible. So it may need no code change.
But I would certainly recompile just on principle!

On Thu, Apr 7, 2022 at 12:28 PM Pralabh Kumar 
wrote:

> Hi spark community
>
> I have quick question .I am planning to migrate from spark 3.0.1 to spark
> 3.2.
>
> Do I need to recompile my application with 3.2 dependencies or application
> compiled with 3.0.1 will work fine on 3.2 ?
>
>
> Regards
> Pralabh kumar
>
>


Re: loop of spark jobs leads to increase in memory on worker nodes and eventually faillure

2022-04-01 Thread Sean Owen
This feels like premature optimization, and not clear it's optimizing, but
maybe.
Caching things that are used once is worse than not caching. It looks like
a straight-line through to the write, so I doubt caching helps anything
here.

On Fri, Apr 1, 2022 at 2:49 AM Joris Billen 
wrote:

> Hi,
> as said thanks for little discussion over mail.
> I understand that the action is triggered in the end at the write and then
> all of a sudden everything is executed at once. But I dont really need to
> trigger an action before. I am caching somewherew a df that will be reused
> several times (slightly updated pseudocode below).
>
> Question: is it then better practice to already trigger some actions on
>  intermediate data frame (like df4 and df8), and cache them? So that these
> actions will not be that expensive yet, and the actions to write at the end
> will require less resources, which would allow to process more days in one
> go? LIke what is added in red in improvement section in the pseudo code
> below?
>
>
>
> *pseudocode:*
>
>
> *loop over all days:*
> *spark submit 1 day*
>
>
>
> with spark submit (overly simplified)=
>
>
> *  df=spark.read(hfs://somepath)*
> *  …*
> *   ##IMPROVEMENT START*
> *   df4=spark.sql(some stuff with df3)*
> *   spark.sql(CACHE TABLE df4)*
> *   …*
> *   df8=spark.sql(some stuff with df7)*
> *   spark.sql(CACHE TABLE df8)*
> *  ##IMPROVEMENT END*
> *   ...*
> *   df12=df11.spark.sql(complex stufff)*
> *  spark.sql(CACHE TABLE df10)*
> *   ...*
> *  df13=spark.sql( complex stuff with df12)*
> *  df13.write *
> *  df14=spark.sql( some other complex stuff with df12)*
> *  df14.write *
> *  df15=spark.sql( some completely other complex stuff with df12)*
> *  df15.write *
>
>
>
>
>
>
> THanks!
>
>
>
> On 31 Mar 2022, at 14:37, Sean Owen  wrote:
>
> If that is your loop unrolled, then you are not doing parts of work at a
> time. That will execute all operations in one go when the write finally
> happens. That's OK, but may be part of the problem. For example if you are
> filtering for a subset, processing, and unioning, then that is just a
> harder and slower way of applying the transformation to all data at once.
>
> On Thu, Mar 31, 2022 at 3:30 AM Joris Billen <
> joris.bil...@bigindustries.be> wrote:
>
>> Thanks for reply :-)
>>
>> I am using pyspark. Basicially my code (simplified is):
>>
>> df=spark.read.csv(hdfs://somehdfslocation)
>> df1=spark.sql (complex statement using df)
>> ...
>> dfx=spark.sql(complex statement using df x-1)
>> ...
>> dfx15.write()
>>
>>
>> What exactly is meant by "closing resources"? Is it just unpersisting
>> cached dataframes at the end and stopping the spark context explicitly:
>> sc.stop()?
>>
>>
>> FOr processing many years at once versus a chunk in a loop: I see that if
>> I go up to certain number of days, one iteration will start to have tasks
>> that fail. So I only take a limited number of days, and do this process
>> several times. Isnt this normal as you are always somehow limited in terms
>> of resources (I have 9 nodes wiht 32GB). Or is it like this that in theory
>> you could process any volume, in case you wait long enough? I guess spark
>> can only break down the tasks up to a certain level (based on the datasets'
>> and the intermediate results’ partitions) and at some moment you hit the
>> limit where your resources are not sufficient anymore to process such one
>> task? Maybe you can tweak it a bit, but in the end you’ll hit a limit?
>>
>>
>>
>> Concretely  following topics would be interesting to find out more about
>> (links):
>> -where to see what you are still consuming after spark job ended if you
>> didnt close resources
>> -memory leaks for pyspark
>> -good article about closing resources (you find tons of snippets on how
>> to start spark context+ config for number/cores/memory of worker/executors
>> etc, but never saw a focus on making sure you clean up —> or is it just
>> stopping the spark context)
>>
>>
>>
>>
>> On 30 Mar 2022, at 21:24, Bjørn Jørgensen 
>> wrote:
>>
>> It`s quite impossible for anyone to answer your question about what is
>> eating your memory, without even knowing what language you are using.
>>
>> If you are using C then it`s always pointers, that's the mem issue.
>> If you are using python, there can be some like not using context manager
>> like With Context Managers and Python's with Statement
>> <https://eur02.safelinks.protection.outlook.com/?url=https%3A%

Re: loop of spark jobs leads to increase in memory on worker nodes and eventually faillure

2022-03-31 Thread Sean Owen
If that is your loop unrolled, then you are not doing parts of work at a
time. That will execute all operations in one go when the write finally
happens. That's OK, but may be part of the problem. For example if you are
filtering for a subset, processing, and unioning, then that is just a
harder and slower way of applying the transformation to all data at once.

On Thu, Mar 31, 2022 at 3:30 AM Joris Billen 
wrote:

> Thanks for reply :-)
>
> I am using pyspark. Basicially my code (simplified is):
>
> df=spark.read.csv(hdfs://somehdfslocation)
> df1=spark.sql (complex statement using df)
> ...
> dfx=spark.sql(complex statement using df x-1)
> ...
> dfx15.write()
>
>
> What exactly is meant by "closing resources"? Is it just unpersisting
> cached dataframes at the end and stopping the spark context explicitly:
> sc.stop()?
>
>
> FOr processing many years at once versus a chunk in a loop: I see that if
> I go up to certain number of days, one iteration will start to have tasks
> that fail. So I only take a limited number of days, and do this process
> several times. Isnt this normal as you are always somehow limited in terms
> of resources (I have 9 nodes wiht 32GB). Or is it like this that in theory
> you could process any volume, in case you wait long enough? I guess spark
> can only break down the tasks up to a certain level (based on the datasets'
> and the intermediate results’ partitions) and at some moment you hit the
> limit where your resources are not sufficient anymore to process such one
> task? Maybe you can tweak it a bit, but in the end you’ll hit a limit?
>
>
>
> Concretely  following topics would be interesting to find out more about
> (links):
> -where to see what you are still consuming after spark job ended if you
> didnt close resources
> -memory leaks for pyspark
> -good article about closing resources (you find tons of snippets on how to
> start spark context+ config for number/cores/memory of worker/executors
> etc, but never saw a focus on making sure you clean up —> or is it just
> stopping the spark context)
>
>
>
>
> On 30 Mar 2022, at 21:24, Bjørn Jørgensen 
> wrote:
>
> It`s quite impossible for anyone to answer your question about what is
> eating your memory, without even knowing what language you are using.
>
> If you are using C then it`s always pointers, that's the mem issue.
> If you are using python, there can be some like not using context manager
> like With Context Managers and Python's with Statement
> <https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Frealpython.com%2Fpython-with-statement%2F=04%7C01%7Cjoris.billen%40bigindustries.be%7C4ed0d54ebb1949dd7dc708da1282e90b%7C49c3d703357947bfa8887c913fbdced9%7C0%7C0%7C637842650741571990%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000=DfS3z2ahDT9B21NnbbN8AlEI3G2JX2FCwo9ZZCuzPVs%3D=0>
>
> And another can be not to close resources after use.
>
> In my experience you can process 3 years or more of data, IF you are
> closing opened resources.
> I use the web GUI http://spark:4040 to follow what spark is doing.
>
>
>
>
> ons. 30. mar. 2022 kl. 17:41 skrev Joris Billen <
> joris.bil...@bigindustries.be>:
>
>> Thanks for answer-much appreciated! This forum is very useful :-)
>>
>> I didnt know the sparkcontext stays alive. I guess this is eating up
>> memory.  The eviction means that he knows that he should clear some of the
>> old cached memory to be able to store new one. In case anyone has good
>> articles about memory leaks I would be interested to read.
>> I will try to add following lines at the end of my job (as I cached the
>> table in spark sql):
>>
>>
>> *sqlContext.sql("UNCACHE TABLE mytableofinterest ")*
>> *spark.stop()*
>>
>>
>> Wrt looping: if I want to process 3 years of data, my modest cluster will
>> never do it one go , I would expect? I have to break it down in smaller
>> pieces and run that in a loop (1 day is already lots of data).
>>
>>
>>
>> Thanks!
>>
>>
>>
>>
>> On 30 Mar 2022, at 17:25, Sean Owen  wrote:
>>
>> The Spark context does not stop when a job does. It stops when you stop
>> it. There could be many ways mem can leak. Caching maybe - but it will
>> evict. You should be clearing caches when no longer needed.
>>
>> I would guess it is something else your program holds on to in its logic.
>>
>> Also consider not looping; there is probably a faster way to do it in one
>> go.
>>
>> On Wed, Mar 30, 2022, 10:16 AM Joris Billen <
>> joris.bil...@bigindustries.be> wrote:
>>

Re: loop of spark jobs leads to increase in memory on worker nodes and eventually faillure

2022-03-30 Thread Sean Owen
The Spark context does not stop when a job does. It stops when you stop it.
There could be many ways mem can leak. Caching maybe - but it will evict.
You should be clearing caches when no longer needed.

I would guess it is something else your program holds on to in its logic.

Also consider not looping; there is probably a faster way to do it in one
go.

On Wed, Mar 30, 2022, 10:16 AM Joris Billen 
wrote:

> Hi,
> I have a pyspark job submitted through spark-submit that does some heavy
> processing for 1 day of data. It runs with no errors. I have to loop over
> many days, so I run this spark job in a loop. I notice after couple
> executions the memory is increasing on all worker nodes and eventually this
> leads to faillures. My job does some caching, but I understand that when
> the job ends successfully, then the sparkcontext is destroyed and the cache
> should be cleared. However it seems that something keeps on filling the
> memory a bit more and more after each run. THis is the memory behaviour
> over time, which in the end will start leading to failures :
>
> (what we see is: green=physical memory used, green-blue=physical memory
> cached, grey=memory capacity =straight line around 31GB )
> This runs on a healthy spark 2.4 and was optimized already to come to a
> stable job in terms of spark-submit resources parameters like
> driver-memory/num-executors/executor-memory/executor-cores/spark.locality.wait).
> Any clue how to “really” clear the memory in between jobs? So basically
> currently I can loop 10x and then need to restart my cluster so all memory
> is cleared completely.
>
>
> Thanks for any info!
>
>


Re: GraphX Support

2022-03-21 Thread Sean Owen
GraphX is not active, though still there and does continue to build and
test with each Spark release. GraphFrames kind of superseded it, but is
also not super active FWIW.

On Mon, Mar 21, 2022 at 6:03 PM Jacob Marquez 
wrote:

> Hello!
>
>
>
> My team and I are evaluating GraphX as a possible solution. Would someone
> be able to speak to the support of this Spark feature? Is there active
> development or is GraphX in maintenance mode (e.g. updated to ensure
> functionality with new Spark releases)?
>
>
>
> Thanks in advance for your help!
>
>
>
> --
>
> Jacob H. Marquez
>
> He/Him
>
> Data & Applied Scientist
>
> Microsoft Cloud Data Sciences
>
>
>


Re: [Spark SQL] Structured Streaming in pyhton can connect to cassandra ?

2022-03-21 Thread Sean Owen
Looks like you are trying to apply this class/function across Spark, but it
contains a non-serialized object, the connection. That has to be
initialized on use, otherwise you try to send it from the driver and that
can't work.

On Mon, Mar 21, 2022 at 11:51 AM guillaume farcy <
guillaume.fa...@imt-atlantique.net> wrote:

> Hello,
>
> I am a student and I am currently doing a big data project.
> Here is my code:
> https://gist.github.com/Balykoo/262d94a7073d5a7e16dfb0d0a576b9c3
>
> My project is to retrieve messages from a twitch chat and send them into
> kafka then spark reads the kafka topic to perform the processing in the
> provided gist.
>
> I will want to send these messages into cassandra.
>
> I tested a first solution on line 72 which works but when there are too
> many messages spark crashes. Probably due to the fact that my function
> connects to cassandra each time it is called.
>
> I tried the object approach to mutualize the connection object but
> without success:
> _pickle.PicklingError: Could not serialize object: TypeError: cannot
> pickle '_thread.RLock' object
>
> Can you please tell me how to do this?
> Or at least give me some advice?
>
> Sincerely,
> FARCY Guillaume.
>
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Continuous ML model training in stream mode

2022-03-17 Thread Sean Owen
(Thank you, not sure that was me though)
I don't know of plans to expose the streaming impls in ML, as they still
work fine in MLlib and they also don't come up much. Continuous training is
relatively rare, maybe under-appreciated, but rare in practice.

On Thu, Mar 17, 2022 at 1:57 PM Gourav Sengupta 
wrote:

> Dear friends,
>
> a few years ago, I was in a London meetup seeing Sean (Owen) demonstrate
> how we can try to predict the gender of individuals who are responding to
> tweets after accepting privacy agreements, in case I am not wrong.
>
> It was real time, it was spectacular, and it was the presentation that set
> me into data science and its applications.
>
> Thanks Sean! :)
>
> Regards,
> Gourav Sengupta
>
>
>
>
> On Tue, Mar 15, 2022 at 9:39 PM Artemis User 
> wrote:
>
>> Thanks Sean!  Well, it looks like we have to abandon our structured
>> streaming model to use DStream for this, or do you see possibility to use
>> structured streaming with ml instead of mllib?
>>
>> On 3/15/22 4:51 PM, Sean Owen wrote:
>>
>> There is a streaming k-means example in Spark.
>> https://spark.apache.org/docs/latest/mllib-clustering.html#streaming-k-means
>>
>> On Tue, Mar 15, 2022, 3:46 PM Artemis User 
>> wrote:
>>
>>> Has anyone done any experiments of training an ML model using stream
>>> data? especially for unsupervised models?   Any suggestions/references
>>> are highly appreciated...
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>


Re: [Pyspark] [Linear Regression] Can't Fit Data

2022-03-17 Thread Sean Owen
The error points you to the answer. Somewhere in your code you are parsing
dates, and the date format is no longer valid / supported. These changes
are doc'ed in the docs it points you to.
It is not related to the regression itself.

On Thu, Mar 17, 2022 at 11:35 AM Bassett, Kenneth
 wrote:

> Hello,
>
>
>
> I am having an issue with Linear Regression when trying to fit training
> data to the model. The code below used to work, but it stopped recently.
> Spark is version 3.2.1.
>
>
>
> # Split Data into train and test data
>
> train, test = data.randomSplit([0.9, 0.1])
>
> y = ’Build_Rate’
>
>
>
> # Perform regression with train data
>
> assembler = VectorAssembler(inputCols=feature_cols, outputCol="Features")
>
> vtrain = assembler.transform(train).select('Features', y)
>
> lin_reg = LinearRegression(regParam = 0.0, elasticNetParam = 0.0,
> solver='normal', featuresCol = 'Features', labelCol = y)
>
> model = lin_reg.fit(vtrain) *FAILS HERE*
>
>
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
> in stage 388.0 failed 4 times, most recent failure: Lost task 0.3 in stage
> 388.0 (TID 422) (10.139.64.4 executor 0):
> org.apache.spark.SparkUpgradeException: You may get a different result due
> to the upgrading of Spark 3.0: Fail to recognize MMM dd,  hh:mm:ss
> aa pattern in the DateTimeFormatter. 1) You can set
> spark.sql.legacy.timeParserPolicy to LEGACY to restore the behavior before
> Spark 3.0. 2) You can form a valid datetime pattern with the guide from
> https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html
>
>
>
> The full traceback is attached.
>
>
>
> The error is confusing me because there are no datetime columns in
> “train”. “vtrain” is just “train” with the feature columns in dense vector
> form.
>
> Does anyone know how to fix this error?
>
>
>
> Thanks,
>
>
> *Ken Bassett **Data Scientist *
>
>
>
>
>
>
>
> 1451 Marvin Griffin Rd.
> Augusta, GA 30906
>
> (m) (706) 469-0696
>
> kbass...@textron.com
>
>
>
> [image: 2019 E-mail Signature]
>
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Re: calculate correlation between multiple columns and one specific column after groupby the spark data frame

2022-03-15 Thread Sean Owen
Are you just trying to avoid writing the function call 30 times? Just put
this in a loop over all the columns instead, which adds a new corr col
every time to a list.

On Tue, Mar 15, 2022, 10:30 PM  wrote:

> Hi all,
>
> I am stuck at  a correlation calculation problem. I have a dataframe like
> below:
> groupiddatacol1datacol2datacol3datacol*corr_co
> 1 1 2 3 4 5
> 1 2 3 4 6 5
> 2 4 2 1 7 5
> 2 8 9 3 2 5
> 3 7 1 2 3 5
> 3 3 5 3 1 5
> I want to calculate the correlation between all datacol columns and
> corr_col column by each groupid.
> So I used the following spark scala-api codes:
>
> df.groupby("groupid").agg(functions.corr("datacol1","corr_col"),functions.corr("datacol2","corr_col"),functions.corr("datacol3","corr_col"),functions.corr("datacol*","corr_col"))
>
> This is very inefficient. If I have 30 data_col columns, I need to input
> 30 times functions.corr to calculate correlation.
>
> I have searched, it seems that functions.corr doesn't accept a List/Array
> parameter, and df.agg doesn't accept a function to be parameter.
> So any  spark scala API codes can do this job efficiently?
>
> Thanks
>
> Liang
>


Re: Continuous ML model training in stream mode

2022-03-15 Thread Sean Owen
There is a streaming k-means example in Spark.
https://spark.apache.org/docs/latest/mllib-clustering.html#streaming-k-means

On Tue, Mar 15, 2022, 3:46 PM Artemis User  wrote:

> Has anyone done any experiments of training an ML model using stream
> data? especially for unsupervised models?   Any suggestions/references
> are highly appreciated...
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: spark distribution build fails

2022-03-14 Thread Sean Owen
Try increasing the stack size in the build. It's the Xss argument you find
in various parts of the pom or sbt build. I have seen this and not sure why
it happens on certain envs, but that's the workaround

On Mon, Mar 14, 2022, 8:59 AM Bulldog20630405 
wrote:

>
> using tag v3.2.1 with java 8 getting a stackoverflow when building the
> distribution:
>
> > alias mvn
> alias mvn='mvn --errors --fail-at-end -DskipTests '
> > dev/make-distribution.sh --name 'hadoop-3.2' --pip --tgz -Phive
> -Phive-thriftserver -Pmesos -Pyarn -Pkubernetes
>
> [INFO]
> 
> [INFO] Reactor Summary for Spark Project Parent POM 3.2.1:
> [INFO]
> [INFO] Spark Project Parent POM ... SUCCESS [
>  2.978 s]
> [INFO] Spark Project Tags . SUCCESS [
>  6.585 s]
> [INFO] Spark Project Sketch ... SUCCESS [
>  6.684 s]
> [INFO] Spark Project Local DB . SUCCESS [
>  2.497 s]
> [INFO] Spark Project Networking ... SUCCESS [
>  6.312 s]
> [INFO] Spark Project Shuffle Streaming Service  SUCCESS [
>  3.925 s]
> [INFO] Spark Project Unsafe ... SUCCESS [
>  7.879 s]
> [INFO] Spark Project Launcher . SUCCESS [
>  2.238 s]
> [INFO] Spark Project Core . SUCCESS [02:33
> min]
> [INFO] Spark Project ML Local Library . SUCCESS [
> 24.566 s]
> [INFO] Spark Project GraphX ... SUCCESS [
> 28.293 s]
> [INFO] Spark Project Streaming  SUCCESS [
> 51.070 s]
> [INFO] Spark Project Catalyst . FAILURE [
> 36.920 s]
> [INFO] Spark Project SQL .. SKIPPED
> [INFO] Spark Project ML Library ... SKIPPED
> [INFO] Spark Project Tools  SKIPPED
> 
>
> [INFO] Spark Avro . SKIPPED
> [INFO]
> 
> [INFO] BUILD FAILURE
> [INFO]
> 
> [INFO] Total time:  05:33 min
> [INFO] Finished at: 2022-03-14T13:45:15Z
> [INFO]
> 
> ---
> constituent[0]: file:/home/bulldog/software/maven/maven-3.8.4/conf/logging/
> constituent[1]:
> file:/home/bulldog/software/maven/maven-3.8.4/lib/maven-embedder-3.8.4.jar
> constituent[2]:
> file:/home/bulldog/software/maven/maven-3.8.4/lib/maven-settings-3.8.4.jar
> constituent[3]:
> file:/home/bulldog/software/maven/maven-3.8.4/lib/maven-settings-builder-3.8.4.jar
> constituent[4]:
> file:/home/bulldog/software/maven/maven-3.8.4/lib/maven-plugin-api-3.8.4.jar
> 
> ---
> Exception in thread "main" java.lang.StackOverflowError
> at
> scala.tools.nsc.transform.TypingTransformers$TypingTransformer.transform(TypingTransformers.scala:49)
> at
> scala.tools.nsc.transform.ExtensionMethods$Extender.transform(ExtensionMethods.scala:275)
> at
> scala.tools.nsc.transform.ExtensionMethods$Extender.transform(ExtensionMethods.scala:133)
> ...
>
>
>
>
>
>


Re: [SPARK-38438] pyspark - how to update spark.jars.packages on existing default context?

2022-03-10 Thread Sean Owen
Wouldn't these be separately submitted jobs for separate workloads? You can
of course dynamically change each job submitted to have whatever packages
you like, from whatever is orchestrating. A single job doing everything
sound right.

On Thu, Mar 10, 2022, 12:05 PM Rafał Wojdyła  wrote:

> Because I can't (and should not) know ahead of time which jobs will be
> executed, that's the job of the orchestration layer (and can be dynamic). I
> know I can specify multiple packages. Also not worried about memory.
>
> On Thu, 10 Mar 2022 at 13:54, Artemis User  wrote:
>
>> If changing packages or jars isn't your concern, why not just specify ALL
>> packages that you would need for the Spark environment?  You know you can
>> define multiple packages under the packages option.  This shouldn't cause
>> memory issues since JVM uses dynamic class loading...
>>
>> On 3/9/22 10:03 PM, Rafał Wojdyła wrote:
>>
>> Hi Artemis,
>> Thanks for your input, to answer your questions:
>>
>> > You may want to ask yourself why it is necessary to change the jar
>> packages during runtime.
>>
>> I have a long running orchestrator process, which executes multiple spark
>> jobs, currently on a single VM/driver, some of those jobs might
>> require extra packages/jars (please see example in the issue).
>>
>> > Changing package doesn't mean to reload the classes.
>>
>> AFAIU this is unrelated
>>
>> > There is no way to reload the same class unless you customize the
>> classloader of Spark.
>>
>> AFAIU this is an implementation detail.
>>
>> > I also don't think it is necessary to implement a warning or error
>> message when changing the configuration since it doesn't do any harm
>>
>> To reiterate right now the API allows to change configuration of the
>> context, without that configuration taking effect. See example of confused
>> users here:
>>  *
>> https://stackoverflow.com/questions/41886346/spark-2-1-0-session-config-settings-pyspark
>>  *
>> https://stackoverflow.com/questions/53606756/how-to-set-spark-driver-memory-in-client-mode-pyspark-version-2-3-1
>>
>> I'm curious if you have any opinion about the "hard-reset" workaround,
>> copy-pasting from the issue:
>>
>> ```
>> s: SparkSession = ...
>>
>> # Hard reset:
>> s.stop()
>> s._sc._gateway.shutdown()
>> s._sc._gateway.proc.stdin.close()
>> SparkContext._gateway = None
>> SparkContext._jvm = None
>> ```
>>
>> Cheers - Rafal
>>
>> On 2022/03/09 15:39:58 Artemis User wrote:
>> > This is indeed a JVM issue, not a Spark issue.  You may want to ask
>> > yourself why it is necessary to change the jar packages during
>> runtime.
>> > Changing package doesn't mean to reload the classes. There is no way to
>> > reload the same class unless you customize the classloader of Spark.  I
>> > also don't think it is necessary to implement a warning or error
>> message
>> > when changing the configuration since it doesn't do any harm.  Spark
>> > uses lazy binding so you can do a lot of such "unharmful" things.
>> > Developers will have to understand the behaviors of each API before
>> when
>> > using them..
>> >
>> >
>> > On 3/9/22 9:31 AM, Rafał Wojdyła wrote:
>> > >  Sean,
>> > > I understand you might be sceptical about adding this functionality
>> > > into (py)spark, I'm curious:
>> > > * would error/warning on update in configuration that is currently
>> > > effectively impossible (requires restart of JVM) be reasonable?
>> > > * what do you think about the workaround in the issue?
>> > > Cheers - Rafal
>> > >
>> > > On Wed, 9 Mar 2022 at 14:24, Sean Owen  wrote:
>> > >
>> > > Unfortunately this opens a lot more questions and problems than it
>> > > solves. What if you take something off the classpath, for example?
>> > > change a class?
>> > >
>> > > On Wed, Mar 9, 2022 at 8:22 AM Rafał Wojdyła
>> > >  wrote:
>> > >
>> > > Thanks Sean,
>> > > To be clear, if you prefer to change the label on this issue
>> > > from bug to sth else, feel free to do so, no strong opinions
>> > > on my end. What happens to the classpath, whether spark uses
>> > > some classloader magic, is probably an implementation detail.
>> > > That said, it's definitely not intuitive that you ca

Re: spark jobs don't require the master/worker to startup?

2022-03-09 Thread Sean Owen
You can run Spark in local mode and not require any standalone master or
worker.
Are you sure you're not using local mode? are you sure the daemons aren't
running?
What is the Spark master you pass?

On Wed, Mar 9, 2022 at 7:35 PM  wrote:

> What I tried to say is, I didn't start spark master/worker at all, for a
> standalone deployment.
>
> But I still can login into pyspark to run the job. I don't know why.
>
> $ ps -efw|grep spark
> $ netstat -ntlp
>
> both the output above have no spark related info.
> And this machine is managed by myself, I know how to start spark
> correctly. But I didn't start them yet, and I still can login to pyspark
> to run the jobs. for exmaple:
>
> >>> df = sc.parallelize([("t1",1),("t2",2)]).toDF(["name","number"])
> >>> df.show()
> ++--+
> |name|number|
> ++--+
> |  t1| 1|
> |  t2| 2|
> ++--+
>
>
> do you know why?
> Thank you.
> frakass.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: RebaseDateTime with dynamicAllocation

2022-03-09 Thread Sean Owen
Doesn't quite seem the same. What is the rest of the error -- why did the
class fail to initialize?

On Wed, Mar 9, 2022 at 10:08 AM Andreas Weise 
wrote:

> Hi,
>
> When playing around with spark.dynamicAllocation.enabled I face the
> following error after the first round of executors have been killed.
>
> Py4JJavaError: An error occurred while calling o337.showString. :
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 1
> in stage 18.0 failed 4 times, most recent failure: Lost task 1.3 in stage
> 18.0 (TID 220) (10.128.6.170 executor 13): java.lang.NoClassDefFoundError:
> Could not initialize class
> org.apache.spark.sql.catalyst.util.RebaseDateTime$ at
> org.apache.spark.sql.catalyst.util.RebaseDateTime.lastSwitchJulianTs(RebaseDateTime.scala)
> at
> org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory.rebaseTimestamp(ParquetVectorUpdaterFactory.java:1067)
> at
> org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory.rebaseInt96(ParquetVectorUpdaterFactory.java:1088)
> at
> org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory.access$1500(ParquetVectorUpdaterFactory.java:43)
> at
> org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory$BinaryToSQLTimestampRebaseUpdater.decodeSingleDictionaryId(ParquetVectorUpdaterFactory.java:860)
> at
> org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdater.decodeDictionaryIds(ParquetVectorUpdater.java:75)
> at
> org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:216)
> at
> org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:298)
> at
> org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:196)
> at
> org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:104)
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:191)
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:104)
> at
> org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:522)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown
> Source) at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_1$(Unknown
> Source) at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown
> Source) at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
> Source) at
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
> at
> org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
> at org.apache.spark.scheduler.Task.run(Task.scala:131) at
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462) at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
>
> We tested on Spark 3.2.1 k8s with these dynamicAllocation settings:
>
> spark.dynamicAllocation.enabled=true
> spark.dynamicAllocation.maxExecutors=4
> spark.dynamicAllocation.minExecutors=1
> spark.dynamicAllocation.executorIdleTimeout=30s
> spark.dynamicAllocation.shuffleTracking.enabled=true
> spark.dynamicAllocation.shuffleTracking.timeout=30s
> spark.decommission.enabled=true
>
> Might be related to SPARK-34772 /
> https://www.mail-archive.com/commits@spark.apache.org/msg50240.html but
> as this was fixed for 3.2.0 it might be worth another issue ?
>
> Best regards
> Andreas
>


Re: [SPARK-38438] pyspark - how to update spark.jars.packages on existing default context?

2022-03-09 Thread Sean Owen
Unfortunately this opens a lot more questions and problems than it solves.
What if you take something off the classpath, for example? change a class?

On Wed, Mar 9, 2022 at 8:22 AM Rafał Wojdyła  wrote:

> Thanks Sean,
> To be clear, if you prefer to change the label on this issue from bug to
> sth else, feel free to do so, no strong opinions on my end. What happens to
> the classpath, whether spark uses some classloader magic, is probably an
> implementation detail. That said, it's definitely not intuitive that you
> can change the configuration and get the context (with the updated config)
> without any warnings/errors. Also what would you recommend as a workaround
> or solution to this problem? Any comments about the workaround in the
> issue? Keep in mind that I can't restart the long running orchestration
> process (python process if that matters).
> Cheers - Rafal
>
> On Wed, 9 Mar 2022 at 13:15, Sean Owen  wrote:
>
>> That isn't a bug - you can't change the classpath once the JVM is
>> executing.
>>
>> On Wed, Mar 9, 2022 at 7:11 AM Rafał Wojdyła 
>> wrote:
>>
>>> Hi,
>>> My use case is that, I have a long running process (orchestrator) with
>>> multiple tasks, some tasks might require extra spark dependencies. It seems
>>> once the spark context is started it's not possible to update
>>> `spark.jars.packages`? I have reported an issue at
>>> https://issues.apache.org/jira/browse/SPARK-38438, together with a
>>> workaround ("hard reset of the cluster"). I wonder if anyone has a solution
>>> for this?
>>> Cheers - Rafal
>>>
>>


Re: spark jobs don't require the master/worker to startup?

2022-03-09 Thread Sean Owen
Did it start successfully? What do you mean ports were not opened?

On Wed, Mar 9, 2022 at 3:02 AM  wrote:

> Hello
>
> I have spark 3.2.0 deployed in localhost as the standalone mode.
> I even didn't run the start master and worker command:
>
>  start-master.sh
>  start-worker.sh spark://127.0.0.1:7077
>
>
> And the ports (such as 7077) were not opened there.
> But I still can login into pyspark to run the jobs.
>
> Why this happens?
>
> Thanks.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: [SPARK-38438] pyspark - how to update spark.jars.packages on existing default context?

2022-03-09 Thread Sean Owen
That isn't a bug - you can't change the classpath once the JVM is executing.

On Wed, Mar 9, 2022 at 7:11 AM Rafał Wojdyła  wrote:

> Hi,
> My use case is that, I have a long running process (orchestrator) with
> multiple tasks, some tasks might require extra spark dependencies. It seems
> once the spark context is started it's not possible to update
> `spark.jars.packages`? I have reported an issue at
> https://issues.apache.org/jira/browse/SPARK-38438, together with a
> workaround ("hard reset of the cluster"). I wonder if anyone has a solution
> for this?
> Cheers - Rafal
>


Re: spark 3.2.1 download

2022-03-07 Thread Sean Owen
Hm, 3.2.1 shows up for me, it's the default. Try refreshing the page?
sometimes people have an old cached copy.

On Mon, Mar 7, 2022 at 10:30 AM Bulldog20630405 
wrote:

>
> from website spark 3.2.1 has been release in january 2020; however not
> available for download from => https://spark.apache.org/downloads.html
> (only 3.2.0)
>
> when will spark binary 3.2.1 be available for download?
>
>
>
>


Re: [Spark SQL] Null when trying to use corr() with a Window

2022-02-28 Thread Sean Owen
The results make sense then. You want a correlation per group right?
because it's over the sums by ID within the group. Then currentRow is
wrong; needs to be unbounded preceding and following.


On Mon, Feb 28, 2022 at 9:22 AM Edgar H  wrote:

> The window is defined as you said yes, unboundedPreceding and currentRow
> ordering by orderCountSum.
>
> val initialSetWindow = Window
>   .partitionBy("group")
>   .orderBy("orderCountSum")
>   .rowsBetween(Window.unboundedPreceding, Window.currentRow)
>
> I'm trying to obtain the correlation for each of the members of the group
> yes (or the accumulative per element, don't really know how to phrase
> that), and the correlation is affected by the counter used for the column,
> right? Top to bottom?
>
> Ps. Thank you so much for replying so fast!
>
> El lun, 28 feb 2022 a las 15:56, Sean Owen () escribió:
>
>> How are you defining the window? It looks like it's something like "rows
>> unbounded proceeding, current" or the reverse, as the correlation varies
>> across the elements of the group as if it's computing them on 1, then 2,
>> then 3 elements. Don't you want the correlation across the group? otherwise
>> this answer is 'right' for what you're doing it seems.
>>
>> On Mon, Feb 28, 2022 at 7:49 AM Edgar H  wrote:
>>
>>> My bad completely, missed the example by a mile sorry for that, let me
>>> change a couple of things.
>>>
>>> - Got to add "id" to the initial grouping and also add more elements to
>>> the initial set;
>>>
>>> val sampleSet = Seq(
>>>   ("group1", "id1", 1, 1, 6),
>>>   ("group1", "id1", 4, 4, 6),
>>>   ("group1", "id2", 2, 2, 5),
>>>   ("group1", "id3", 3, 3, 4),
>>>   ("group2", "id1", 4, 4, 3),
>>>   ("group2", "id2", 5, 5, 2),
>>>   ("group2", "id3", 6, 6, 1),
>>>   ("group2", "id3", 15, 6, 1)
>>> )
>>>
>>> val groupedSet = initialSet
>>>   .groupBy(
>>> "group", "id"
>>>   ).agg(
>>> sum("count1").as("count1Sum"),
>>> sum("count2").as("count2Sum"),
>>> sum("orderCount").as("orderCountSum")
>>> )
>>>   .withColumn("cf", corr("count1Sum",
>>> "count2Sum").over(initialSetWindow))
>>>
>>> Now, with this in place, in case the correlation is applied, the
>>> following is shown:
>>>
>>> +--+---+-+-+-+--+
>>> | group| id|count1Sum|count2Sum|orderCountSum|cf|
>>> +--+---+-+-+-+--+
>>> |group1|id3|3|3|4|  null|
>>> |group1|id2|2|2|5|   1.0|
>>> |group1|id1|5|5|   12|   1.0|
>>> |group2|id3|   21|   12|2|  null|
>>> |group2|id2|5|5|2|   1.0|
>>> |group2|id1|4|4|3|0.9980460957560549|
>>> +--+---+-+-+-+--+
>>>
>>> Taking into account what you just mentioned... Even if the Window is
>>> only partitioned by "group", would it still be impossible to obtain a
>>> correlation? I'm trying to do like...
>>>
>>> group1 = id1, id2, id3 (and their respective counts) - apply the
>>> correlation over the set of ids within the group (without taking into
>>> account they are a sum)
>>> group2 = id1, id2, id3 (and their respective counts) - same as before
>>>
>>> However, the highest element is still null. When changing the
>>> rowsBetween call to .rowsBetween(Window.unboundedPreceding,
>>> Window.unboundedFollowing) it will just calculate the whole subset
>>> correlation. Shouldn't the first element of the correlation calculate
>>> itself?
>>>
>>


Re: [Spark SQL] Null when trying to use corr() with a Window

2022-02-28 Thread Sean Owen
How are you defining the window? It looks like it's something like "rows
unbounded proceeding, current" or the reverse, as the correlation varies
across the elements of the group as if it's computing them on 1, then 2,
then 3 elements. Don't you want the correlation across the group? otherwise
this answer is 'right' for what you're doing it seems.

On Mon, Feb 28, 2022 at 7:49 AM Edgar H  wrote:

> My bad completely, missed the example by a mile sorry for that, let me
> change a couple of things.
>
> - Got to add "id" to the initial grouping and also add more elements to
> the initial set;
>
> val sampleSet = Seq(
>   ("group1", "id1", 1, 1, 6),
>   ("group1", "id1", 4, 4, 6),
>   ("group1", "id2", 2, 2, 5),
>   ("group1", "id3", 3, 3, 4),
>   ("group2", "id1", 4, 4, 3),
>   ("group2", "id2", 5, 5, 2),
>   ("group2", "id3", 6, 6, 1),
>   ("group2", "id3", 15, 6, 1)
> )
>
> val groupedSet = initialSet
>   .groupBy(
> "group", "id"
>   ).agg(
> sum("count1").as("count1Sum"),
> sum("count2").as("count2Sum"),
> sum("orderCount").as("orderCountSum")
> )
>   .withColumn("cf", corr("count1Sum", "count2Sum").over(initialSetWindow))
>
> Now, with this in place, in case the correlation is applied, the following
> is shown:
>
> +--+---+-+-+-+--+
> | group| id|count1Sum|count2Sum|orderCountSum|cf|
> +--+---+-+-+-+--+
> |group1|id3|3|3|4|  null|
> |group1|id2|2|2|5|   1.0|
> |group1|id1|5|5|   12|   1.0|
> |group2|id3|   21|   12|2|  null|
> |group2|id2|5|5|2|   1.0|
> |group2|id1|4|4|3|0.9980460957560549|
> +--+---+-+-+-+--+
>
> Taking into account what you just mentioned... Even if the Window is only
> partitioned by "group", would it still be impossible to obtain a
> correlation? I'm trying to do like...
>
> group1 = id1, id2, id3 (and their respective counts) - apply the
> correlation over the set of ids within the group (without taking into
> account they are a sum)
> group2 = id1, id2, id3 (and their respective counts) - same as before
>
> However, the highest element is still null. When changing the rowsBetween
> call to .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
> it will just calculate the whole subset correlation. Shouldn't the first
> element of the correlation calculate itself?
>


Re: [Spark SQL] Null when trying to use corr() with a Window

2022-02-28 Thread Sean Owen
You're computing correlations of two series of values, but each series has
one value, a sum. Correlation is not defined in this case (both variances
are undefined). This is sample correlation, note.

On Mon, Feb 28, 2022 at 7:06 AM Edgar H  wrote:

> Morning all, been struggling with this for a while and can't really seem
> to understand what I'm doing wrong...
>
> Having the following DataFrame I want to apply the corr function over the
> following DF;
>
> val sampleColumns = Seq("group", "id", "count1", "count2", "orderCount")
>
> val sampleSet = Seq(
>   ("group1", "id1", 1, 1, 6),
>   ("group1", "id2", 2, 2, 5),
>   ("group1", "id3", 3, 3, 4),
>   ("group2", "id4", 4, 4, 3),
>   ("group2", "id5", 5, 5, 2),
>   ("group2", "id6", 6, 6, 1)
> )
>
> val initialSet = sparkSession
>   .createDataFrame(sampleSet)
>   .toDF(sampleColumns: _*)
>
> - .show()
>
> +--+---+--+--+--+
> | group| id|count1|count2|orderCount|
> +--+---+--+--+--+
> |group1|id1| 1| 1| 6|
> |group1|id2| 2| 2| 5|
> |group1|id3| 3| 3| 4|
> |group2|id4| 4| 4| 3|
> |group2|id5| 5| 5| 2|
> |group2|id6| 6| 6| 1|
> +--+---+--+--+--+
>
> val initialSetWindow = Window
>   .partitionBy("group")
>   .orderBy("orderCountSum")
>   .rowsBetween(Window.unboundedPreceding, Window.currentRow)
>
> val groupedSet = initialSet
>   .groupBy(
> "group"
>   ).agg(
> sum("count1").as("count1Sum"),
> sum("count2").as("count2Sum"),
> sum("orderCount").as("orderCountSum")
> )
>   .withColumn("cf", corr("count1Sum", "count2Sum").over(initialSetWindow))
>
> - .show()
>
> +--+-+-+-++
> | group|count1Sum|count2Sum|orderCountSum|  cf|
> +--+-+-+-++
> |group1|6|6|   15|null|
> |group2|   15|   15|6|null|
> +--+-+-+-++
>
> When trying to apply the corr function, some of the resulting values in cf
> are null for some reason:
>
> The question is, *how can I apply corr to each of the rows within their
> subgroup (Window)?* Would like to obtain the corr value per Row and
> subgroup (group1 and group2), and even if more nested IDs were added (group
> + id) it should still work.
>


Re: Difference between windowing functions and aggregation functions on big data

2022-02-27 Thread Sean Owen
"count distinct' does not have that problem, whether in a group-by or not.
I'm still not sure these are equivalent queries but maybe not seeing it.
Windowing makes sense when you need the whole window, or when you need
sliding windows to express the desired groups.
It may be unnecessary when your query does not need the window, just a
summary stat like 'max'. Depends.

On Sun, Feb 27, 2022 at 2:14 PM Bjørn Jørgensen 
wrote:

> You are using distinct which collects everything to the driver. Soo use
> the other one :)
>
> søn. 27. feb. 2022 kl. 21:00 skrev Sid :
>
>> Basically, I am trying two different approaches for the same problem and
>> my concern is how it will behave in the case of big data if you talk about
>> millions of records. Which one would be faster? Is using windowing
>> functions a better way since it will load the entire dataset into a single
>> window and do the operations?
>>
>
>


Re: Difference between windowing functions and aggregation functions on big data

2022-02-27 Thread Sean Owen
Those queries look like they do fairly different things. One is selecting
top employees by salary, the other is ... selecting where there are less
than 3 distinct salaries or something.
Not sure what the intended comparison is then; these are not equivalent
ways of doing the same thing, or does not seem so as far as I can see.

On Sun, Feb 27, 2022 at 12:30 PM Sid  wrote:

> My bad.
>
> Aggregation Query:
>
> # Write your MySQL query statement below
>
>SELECT D.Name AS Department, E.Name AS Employee, E.Salary AS Salary
> FROM Employee E INNER JOIN Department D ON E.DepartmentId = D.Id
> WHERE (SELECT COUNT(DISTINCT(Salary)) FROM Employee
>WHERE DepartmentId = E.DepartmentId AND Salary > E.Salary) < 3
> ORDER by E.DepartmentId, E.Salary DESC
>
> Time Taken: 1212 ms
>
> Windowing Query:
>
> select Department,Employee,Salary from (
> select d.name as Department, e.name as Employee,e.salary as
> Salary,dense_rank() over(partition by d.name order by e.salary desc) as
> rnk from Department d join Employee e on e.departmentId=d.id ) a where
> rnk<=3
>
> Time Taken: 790 ms
>
> Thanks,
> Sid
>
>
> On Sun, Feb 27, 2022 at 11:35 PM Sean Owen  wrote:
>
>> Those two queries are identical?
>>
>> On Sun, Feb 27, 2022 at 11:30 AM Sid  wrote:
>>
>>> Hi Team,
>>>
>>> I am aware that if windowing functions are used, then at first it loads
>>> the entire dataset into one window,scans and then performs the other
>>> mentioned operations for that particular window which could be slower when
>>> dealing with trillions / billions of records.
>>>
>>> I did a POC where I used an example to find the max 3 highest salary for
>>> an employee per department. So, I wrote a below queries and compared the
>>> time for it:
>>>
>>> Windowing Query:
>>>
>>> select Department,Employee,Salary from (
>>> select d.name as Department, e.name as Employee,e.salary as
>>> Salary,dense_rank() over(partition by d.name order by e.salary desc) as
>>> rnk from Department d join Employee e on e.departmentId=d.id ) a where
>>> rnk<=3
>>>
>>> Time taken: 790 ms
>>>
>>> Aggregation Query:
>>>
>>> select Department,Employee,Salary from (
>>> select d.name as Department, e.name as Employee,e.salary as
>>> Salary,dense_rank() over(partition by d.name order by e.salary desc) as
>>> rnk from Department d join Employee e on e.departmentId=d.id ) a where
>>> rnk<=3
>>>
>>> Time taken: 1212 ms
>>>
>>> But as per my understanding, the aggregation should have run faster. So,
>>> my whole point is if the dataset is huge I should force some kind of map
>>> reduce jobs like we have an option called df.groupby().reduceByGroups()
>>>
>>> So I think the aggregation query is taking more time since the dataset
>>> size here is smaller and as we all know that map reduce works faster when
>>> there is a huge volume of data. Haven't tested it yet on big data but
>>> needed some expert guidance over here.
>>>
>>> Please correct me if I am wrong.
>>>
>>> TIA,
>>> Sid
>>>
>>>
>>>
>>>


Re: Difference between windowing functions and aggregation functions on big data

2022-02-27 Thread Sean Owen
Those two queries are identical?

On Sun, Feb 27, 2022 at 11:30 AM Sid  wrote:

> Hi Team,
>
> I am aware that if windowing functions are used, then at first it loads
> the entire dataset into one window,scans and then performs the other
> mentioned operations for that particular window which could be slower when
> dealing with trillions / billions of records.
>
> I did a POC where I used an example to find the max 3 highest salary for
> an employee per department. So, I wrote a below queries and compared the
> time for it:
>
> Windowing Query:
>
> select Department,Employee,Salary from (
> select d.name as Department, e.name as Employee,e.salary as
> Salary,dense_rank() over(partition by d.name order by e.salary desc) as
> rnk from Department d join Employee e on e.departmentId=d.id ) a where
> rnk<=3
>
> Time taken: 790 ms
>
> Aggregation Query:
>
> select Department,Employee,Salary from (
> select d.name as Department, e.name as Employee,e.salary as
> Salary,dense_rank() over(partition by d.name order by e.salary desc) as
> rnk from Department d join Employee e on e.departmentId=d.id ) a where
> rnk<=3
>
> Time taken: 1212 ms
>
> But as per my understanding, the aggregation should have run faster. So,
> my whole point is if the dataset is huge I should force some kind of map
> reduce jobs like we have an option called df.groupby().reduceByGroups()
>
> So I think the aggregation query is taking more time since the dataset
> size here is smaller and as we all know that map reduce works faster when
> there is a huge volume of data. Haven't tested it yet on big data but
> needed some expert guidance over here.
>
> Please correct me if I am wrong.
>
> TIA,
> Sid
>
>
>
>


Re: Issue while creating spark app

2022-02-26 Thread Sean Owen
I don't think any of that is related, no.
How are you dependencies set up? manually with IJ, or in a build file
(Maven, Gradle)? Normally you do the latter and dependencies are taken care
of for you, but you app would definitely have to express a dependency on
Scala libs.

On Sat, Feb 26, 2022 at 4:25 PM Bitfox  wrote:

> Java SDK installed?
>
> On Sun, Feb 27, 2022 at 5:39 AM Sachit Murarka 
> wrote:
>
>> Hello ,
>>
>> Thanks for replying. I have installed Scala plugin in IntelliJ  first
>> then also it's giving same error
>>
>> Cannot find project Scala library 2.12.12 for module SparkSimpleApp
>>
>> Thanks
>> Rajat
>>
>> On Sun, Feb 27, 2022, 00:52 Bitfox  wrote:
>>
>>> You need to install scala first, the current version for spark is 2.12.15
>>> I would suggest you install scala by sdk which works great.
>>>
>>> Thanks
>>>
>>> On Sun, Feb 27, 2022 at 12:10 AM rajat kumar 
>>> wrote:
>>>
 Hello Users,

 I am trying to create spark application using Scala(Intellij).
 I have installed Scala plugin in intelliJ still getting below error:-

 Cannot find project Scala library 2.12.12 for module SparkSimpleApp


 Could anyone please help what I am doing wrong?

 Thanks

 Rajat

>>>


Re: Spark Kafka Integration

2022-02-25 Thread Sean Owen
Spark 3.2.1 is compiled vs Kafka 2.8.0; the forthcoming Spark 3.3 against
Kafka 3.1.0.
It may well be mutually compatible though.

On Fri, Feb 25, 2022 at 2:40 PM Michael Williams (SSI) <
michael.willi...@ssigroup.com> wrote:

> I believe it is 3.1, but if there is a different version that “works
> better” with spark, any advice would be appreciated.  Our entire team is
> totally new to spark and kafka (this is a poc trial).
>
>
>
> *From:* Mich Talebzadeh [mailto:mich.talebza...@gmail.com]
> *Sent:* Friday, February 25, 2022 2:30 PM
> *To:* Michael Williams (SSI) 
> *Cc:* user@spark.apache.org
> *Subject:* Re: Spark Kafka Integration
>
>
>
> and what version of kafka do you have 2.7?
>
>
>
> for spark 3.1.1 I needed these jar files to make it work
>
>
>
> kafka-clients-2.7.0.jar
> commons-pool2-2.9.0.jar
> spark-streaming_2.12-3.1.1.jar
> spark-sql-kafka-0-10_2.12-3.1.0.jar
>
>
>
> HTH
>
>
>
>view my Linkedin profile
> 
>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
>
>
>
> On Fri, 25 Feb 2022 at 20:15, Mich Talebzadeh 
> wrote:
>
> What is the use case? Is this for spark structured streaming?
>
>
>
> HTH
>
>
>
>
>
>view my Linkedin profile
> 
>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
>
>
>
> On Fri, 25 Feb 2022 at 19:38, Michael Williams (SSI) <
> michael.willi...@ssigroup.com> wrote:
>
> After reviewing Spark's Kafka Integration guide, it indicates that
> spark-sql-kafka-0-10_2.12_3.2.1.jar and its dependencies are needed for
> Spark 3.2.1 (+ Scala 2.12) to work with Kafka.  Can anybody clarify the
> cleanest, most repeatable (reliable) way to acquire these jars for
> including in a Spark Docker image without introducing version compatibility
> issues?
>
>
>
> Thank you,
>
> Mike
>
>
>
> This electronic message may contain information that is Proprietary,
> Confidential, or legally privileged or protected. It is intended only for
> the use of the individual(s) and entity named in the message. If you are
> not an intended recipient of this message, please notify the sender
> immediately and delete the material from your computer. Do not deliver,
> distribute or copy this message and do not disclose its contents or take
> any action in reliance on the information it contains. Thank You.
>
>
> This electronic message may contain information that is Proprietary,
> Confidential, or legally privileged or protected. It is intended only for
> the use of the individual(s) and entity named in the message. If you are
> not an intended recipient of this message, please notify the sender
> immediately and delete the material from your computer. Do not deliver,
> distribute or copy this message and do not disclose its contents or take
> any action in reliance on the information it contains. Thank You.
>


Re: Spark Kafka Integration

2022-02-25 Thread Sean Owen
That .jar is available on Maven, though typically you depend on it in your
app, and compile an uber JAR which will contain it and all its dependencies.
You can I suppose manage to compile an uber JAR from that dependency itself
with tools if needed.

On Fri, Feb 25, 2022 at 1:37 PM Michael Williams (SSI) <
michael.willi...@ssigroup.com> wrote:

> After reviewing Spark's Kafka Integration guide, it indicates that
> spark-sql-kafka-0-10_2.12_3.2.1.jar and its dependencies are needed for
> Spark 3.2.1 (+ Scala 2.12) to work with Kafka.  Can anybody clarify the
> cleanest, most repeatable (reliable) way to acquire these jars for
> including in a Spark Docker image without introducing version compatibility
> issues?
>
>
>
> Thank you,
>
> Mike
>
>
> This electronic message may contain information that is Proprietary,
> Confidential, or legally privileged or protected. It is intended only for
> the use of the individual(s) and entity named in the message. If you are
> not an intended recipient of this message, please notify the sender
> immediately and delete the material from your computer. Do not deliver,
> distribute or copy this message and do not disclose its contents or take
> any action in reliance on the information it contains. Thank You.
>


Re: DataTables 1.10.20 reported vulnerable in spark-core_2.13:3.2.1

2022-02-24 Thread Sean Owen
What is the vulnerability and does it affect Spark? what is the remediation?
Can you try updating these and open a pull request if it works?

On Thu, Feb 24, 2022 at 7:28 AM vinodh palanisamy 
wrote:

> Hi Team,
>   We are using spark-core_2.13:3.2.1 in our project. Where in that
> version Blackduck scan reports the below the js files as vulnerable.
>
> dataTables.bootstrap4.1.10.20.min.js
> jquery.dataTables..1.10.20.min.js
>
> Please let me know if this can be fixed in my project or Datatables
> version used in the spark-core would be updated to a non vulnerable version.
>
> Regards
> Vinodh Palaniswamy
>
>


Re: [E] COMMERCIAL BULK: Re: TensorFlow on Spark

2022-02-24 Thread Sean Owen
On the contrary, distributed deep learning is not data parallel. It's
dominated by the need to share parameters across workers.
Gourav, I don't understand what you're looking for. Have you looked at
Petastorm and Horovod? they _use Spark_, not another platform like Ray. Why
recreate this which has worked for years? what would it matter if it were
in the Spark project? I think you're on a limb there.
One goal of Spark is very much not to build in everything that could exist
as a library, and distributed deep learning remains an important but niche
use case. Instead it provides the infra for these things, like barrier mode.

On Thu, Feb 24, 2022 at 7:21 AM Bitfox  wrote:

> I have been using tensorflow for a long time, it's not hard to implement a
> distributed training job at all, either by model parallelization or data
> parallelization. I don't think there is much need to develop spark to
> support tensorflow jobs. Just my thoughts...
>
>
> On Thu, Feb 24, 2022 at 4:36 PM Gourav Sengupta 
> wrote:
>
>> Hi,
>>
>> I do not think that there is any reason for using over engineered
>> platforms like Petastorm and Ray, except for certain use cases.
>>
>> What Ray is doing, except for certain use cases, could have been easily
>> done by SPARK, I think, had the open source community got that steer. But
>> maybe I am wrong and someone should be able to explain why the SPARK open
>> source community cannot develop the capabilities which are so natural to
>> almost all use cases of data processing in SPARK where the data gets
>> consumed by deep learning frameworks and we are asked to use Ray or
>> Petastorm?
>>
>> For those of us who are asking what does native integrations means please
>> try to compare delta between release 2.x and 3.x and koalas before 3.2 and
>> after 3.2.
>>
>> I am sure that the SPARK community can push for extending the dataframes
>> from SPARK to deep learning and other frameworks by natively integrating
>> them.
>>
>>
>> Regards,
>> Gourav Sengupta
>>
>>


Re: Unable to display JSON records with null values

2022-02-23 Thread Sean Owen
There is no record "345" here it seems, right? it's not that it exists and
has null fields; it's invalid w.r.t. the schema that the rest suggests.

On Wed, Feb 23, 2022 at 11:57 AM Sid  wrote:

> Hello experts,
>
> I have a JSON data like below:
>
> [
>   {
> "123": {
>   "Party1": {
> "FIRSTNAMEBEN": "ABC",
> "ALIASBEN": "",
> "RELATIONSHIPTYPE": "ABC, FGHIJK LMN",
> "DATEOFBIRTH": "7/Oct/1969"
>   },
>   "Party2": {
> "FIRSTNAMEBEN": "ABCC",
> "ALIASBEN": "",
> "RELATIONSHIPTYPE": "ABC, FGHIJK LMN",
> "DATEOFBIRTH": "7/Oct/1969"
>   }
> },
> "GeneratedTime": "2022-01-30 03:09:26"
>   },
>   {
> "456": {
>   "Party1": {
> "FIRSTNAMEBEN": "ABCD",
> "ALIASBEN": "",
> "RELATIONSHIPTYPE": "ABC, FGHIJK LMN",
> "DATEOFBIRTH": "7/Oct/1969"
>   },
>   "Party2": {
> "FIRSTNAMEBEN": "ABCDD",
> "ALIASBEN": "",
> "RELATIONSHIPTYPE": "ABC, FGHIJK LMN",
> "DATEOFBIRTH": "7/Oct/1969"
>   },
>   "Party3": {
> "FIRSTNAMEBEN": "ABCDDE",
> "ALIASBEN": "",
> "RELATIONSHIPTYPE": "ABC, FGHIJK LMN",
> "DATEOFBIRTH": "7/Oct/1969"
>   }
> },
> "GeneratedTime": "2022-01-30 03:09:26"
>   },
>   {
> "345": {
>
>
> },
> "GeneratedTime": "2022-01-30 03:09:26"
>   }
> ]
>
> However, when I try to display this JSON using below code, it doesn't show
> the blank records. In my case I don't get any records for 345 since it is
> null but I want to display it in the final flattened dataset.
>
> val df = spark.read.option("multiline",
> true).json("/home/siddhesh/Documents/nested_json.json")
>
> Spark version:3.1.1
>
> Thanks,
> Sid
>


Re: Loading .xlsx and .xlx files using pyspark

2022-02-23 Thread Sean Owen
The standalone koalas project should have the same functionality for older
Spark versions:
https://koalas.readthedocs.io/en/latest/

You should be moving to Spark 3 though; 2.x is EOL.

On Wed, Feb 23, 2022 at 9:06 AM Sid  wrote:

> Cool. Here, the problem is I have to run the Spark jobs on Glue ETL which
> supports 2.4.3 of Spark and I don't think so this distributed support was
> added for pandas in that version. AFMKIC, it has been added in 3.2 version.
>
> So how can I do it in spark 2.4.3? Correct me if I'm wrong.
>
>
> On Wed, Feb 23, 2022 at 8:28 PM Bjørn Jørgensen 
> wrote:
>
>> You will. Pandas API on spark that `imported with from pyspark import
>> pandas as ps` is not pandas but an API that is using pyspark under.
>>
>> ons. 23. feb. 2022 kl. 15:54 skrev Sid :
>>
>>> Hi Bjørn,
>>>
>>> Thanks for your reply. This doesn't help while loading huge datasets.
>>> Won't be able to achieve spark functionality while loading the file in
>>> distributed manner.
>>>
>>> Thanks,
>>> Sid
>>>
>>> On Wed, Feb 23, 2022 at 7:38 PM Bjørn Jørgensen <
>>> bjornjorgen...@gmail.com> wrote:
>>>
 from pyspark import pandas as ps


 ps.read_excel?
 "Support both `xls` and `xlsx` file extensions from a local filesystem
 or URL"

 pdf = ps.read_excel("file")

 df = pdf.to_spark()

 ons. 23. feb. 2022 kl. 14:57 skrev Sid :

> Hi Gourav,
>
> Thanks for your time.
>
> I am worried about the distribution of data in case of a huge dataset
> file. Is Koalas still a better option to go ahead with? If yes, how can I
> use it with Glue ETL jobs? Do I have to pass some kind of external jars 
> for
> it?
>
> Thanks,
> Sid
>
> On Wed, Feb 23, 2022 at 7:22 PM Gourav Sengupta <
> gourav.sengu...@gmail.com> wrote:
>
>> Hi,
>>
>> this looks like a very specific and exact problem in its scope.
>>
>> Do you think that you can load the data into panda dataframe and load
>> it back to SPARK using PANDAS UDF?
>>
>> Koalas is now natively integrated with SPARK, try to see if you can
>> use those features.
>>
>>
>> Regards,
>> Gourav
>>
>> On Wed, Feb 23, 2022 at 1:31 PM Sid  wrote:
>>
>>> I have an excel file which unfortunately cannot be converted to CSV
>>> format and I am trying to load it using pyspark shell.
>>>
>>> I tried invoking the below pyspark session with the jars provided.
>>>
>>> pyspark --jars
>>> /home/siddhesh/Downloads/spark-excel_2.12-0.14.0.jar,/home/siddhesh/Downloads/xmlbeans-5.0.3.jar,/home/siddhesh/Downloads/commons-collections4-4.4.jar,/home/siddhesh/Downloads/poi-5.2.0.jar,/home/siddhesh/Downloads/poi-ooxml-5.2.0.jar,/home/siddhesh/Downloads/poi-ooxml-schemas-4.1.2.jar,/home/siddhesh/Downloads/slf4j-log4j12-1.7.28.jar,/home/siddhesh/Downloads/log4j-1.2-api-2.17.1.jar
>>>
>>> and below is the code to read the excel file:
>>>
>>> df = spark.read.format("excel") \
>>>  .option("dataAddress", "'Sheet1'!") \
>>>  .option("header", "true") \
>>>  .option("inferSchema", "true") \
>>> .load("/home/.../Documents/test_excel.xlsx")
>>>
>>> It is giving me the below error message:
>>>
>>>  java.lang.NoClassDefFoundError: org/apache/logging/log4j/LogManager
>>>
>>> I tried several Jars for this error but no luck. Also, what would be
>>> the efficient way to load it?
>>>
>>> Thanks,
>>> Sid
>>>
>>

 --
 Bjørn Jørgensen
 Vestre Aspehaug 4, 6010 Ålesund
 Norge

 +47 480 94 297

>>>
>>
>> --
>> Bjørn Jørgensen
>> Vestre Aspehaug 4, 6010 Ålesund
>> Norge
>>
>> +47 480 94 297
>>
>


Re: Loading .xlsx and .xlx files using pyspark

2022-02-23 Thread Sean Owen
This isn't pandas, it's pandas on Spark. It's distributed.

On Wed, Feb 23, 2022 at 8:55 AM Sid  wrote:

> Hi Bjørn,
>
> Thanks for your reply. This doesn't help while loading huge datasets.
> Won't be able to achieve spark functionality while loading the file in
> distributed manner.
>
> Thanks,
> Sid
>
> On Wed, Feb 23, 2022 at 7:38 PM Bjørn Jørgensen 
> wrote:
>
>> from pyspark import pandas as ps
>>
>>
>> ps.read_excel?
>> "Support both `xls` and `xlsx` file extensions from a local filesystem or
>> URL"
>>
>> pdf = ps.read_excel("file")
>>
>> df = pdf.to_spark()
>>
>>


Re: [E] COMMERCIAL BULK: Re: TensorFlow on Spark

2022-02-23 Thread Sean Owen
Petastorm does that https://github.com/uber/petastorm in the sense that
it feeds Spark DFs to those frameworks in distributed training.
I'm not sure what you mean by native integration that is different? these
tools do just what you are talking about and have for a while.

On Wed, Feb 23, 2022 at 7:06 AM Gourav Sengupta <
gourav.sengupta.develo...@gmail.com> wrote:

> Hi,
>
> I am sure those who have actually built a data processing pipeline whose
> contents have to be then delivered to tensorflow or pytorch (not for POC,
> or writing a blog to get clicks, or resolving symptomatic bugs, but in real
> life end-to-end application), will perhaps understand some of  the issues
> because SPARK dataframes do not natively integrate with tensorflow/
> pytorch.
>
> But perhaps I am wrong.
>
> My point of mentioning Ray is simple, it is based on the fact that if
> SPARK were to be able to natively scale out and distribute data to
> tensorflow, or pytorch then there will be competition between Ray and SPARK.
>
> Regards,
> Gourav Sengupta
>
> On Wed, Feb 23, 2022 at 12:35 PM Sean Owen  wrote:
>
>> Spark does do distributed ML, but not Tensorflow. Barrier execution mode
>> is an element that things like Horovod uses. Not sure what you are getting
>> at?
>> Ray is not Spark.
>> As I say -- Horovod does this already. The upside over TF distributed is
>> that Spark sets up and manages the daemon processes rather than doing it by
>> hand.
>>
>>
>> On Wed, Feb 23, 2022 at 2:43 AM Gourav Sengupta <
>> gourav.sengu...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> the SPARK community should have been able to build distributed ML
>>> capabilities, and as far as I remember that was the idea initially behind
>>> SPARK 3.x roadmap (barrier execution mode,
>>> https://issues.apache.org/jira/browse/SPARK-24579).
>>>
>>> Ray, another Berkeley Labs output like SPARK, is trying to capture that
>>> market space.
>>>
>>> I am not sure whether there is any steer by the SPARK community leaders
>>> to seriously prioritise building those capabilities at all. But I am sure
>>> if the brilliant and fantastic minds behind SPARK did actually want to
>>> allow building those capabilities, they can easily do so, and achieve that
>>> :)
>>>
>>> I would sincerely request the open source SPARK community to prioritise
>>> building the SPARK capabilities to scale ML applications.
>>>
>>>
>>>
>>> Thanks and Regards,
>>> Gourav Sengupta
>>>
>>> On Wed, Feb 23, 2022 at 3:53 AM Bitfox  wrote:
>>>
>>>> tensorflow itself can implement the distributed computing via a
>>>> parameter server. Why did you want spark here?
>>>>
>>>> regards.
>>>>
>>>> On Wed, Feb 23, 2022 at 11:27 AM Vijayant Kumar
>>>>  wrote:
>>>>
>>>>> Thanks Sean for your response. !!
>>>>>
>>>>>
>>>>>
>>>>> Want to add some more background here.
>>>>>
>>>>>
>>>>>
>>>>> I am using Spark3.0+ version with Tensorflow 2.0+.
>>>>>
>>>>> My use case is not for the image data but for the Time-series data
>>>>> where I am using LSTM and transformers to forecast.
>>>>>
>>>>>
>>>>>
>>>>> I evaluated *SparkFlow* and *spark_tensorflow_distributor *libraries, and
>>>>> there has been no major development recently on those libraries. I faced
>>>>> the issue of version dependencies on those and had a hard time fixing the
>>>>> library compatibilities. Hence a couple of below doubts:-
>>>>>
>>>>>
>>>>>
>>>>>- Does *Horovod* have any dependencies?
>>>>>- Any other library which is suitable for my use case.?
>>>>>- Any example code would really be of great help to understand.
>>>>>
>>>>>
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Vijayant
>>>>>
>>>>>
>>>>>
>>>>> *From:* Sean Owen 
>>>>> *Sent:* Wednesday, February 23, 2022 8:40 AM
>>>>> *To:* Vijayant Kumar 
>>>>> *Cc:* user @spark 
>>>>> *Subject:* [E] COMMERCIAL BULK: Re: TensorFlow on Spark
>>>>>
>>>>>
>>>>>
>>>>> *Email is from a Free Mail Service (Gmail/Yahoo/Hotm

Re: [E] COMMERCIAL BULK: Re: TensorFlow on Spark

2022-02-23 Thread Sean Owen
Spark does do distributed ML, but not Tensorflow. Barrier execution mode is
an element that things like Horovod uses. Not sure what you are getting at?
Ray is not Spark.
As I say -- Horovod does this already. The upside over TF distributed is
that Spark sets up and manages the daemon processes rather than doing it by
hand.


On Wed, Feb 23, 2022 at 2:43 AM Gourav Sengupta 
wrote:

> Hi,
>
> the SPARK community should have been able to build distributed ML
> capabilities, and as far as I remember that was the idea initially behind
> SPARK 3.x roadmap (barrier execution mode,
> https://issues.apache.org/jira/browse/SPARK-24579).
>
> Ray, another Berkeley Labs output like SPARK, is trying to capture that
> market space.
>
> I am not sure whether there is any steer by the SPARK community leaders to
> seriously prioritise building those capabilities at all. But I am sure if
> the brilliant and fantastic minds behind SPARK did actually want to allow
> building those capabilities, they can easily do so, and achieve that :)
>
> I would sincerely request the open source SPARK community to prioritise
> building the SPARK capabilities to scale ML applications.
>
>
>
> Thanks and Regards,
> Gourav Sengupta
>
> On Wed, Feb 23, 2022 at 3:53 AM Bitfox  wrote:
>
>> tensorflow itself can implement the distributed computing via a
>> parameter server. Why did you want spark here?
>>
>> regards.
>>
>> On Wed, Feb 23, 2022 at 11:27 AM Vijayant Kumar
>>  wrote:
>>
>>> Thanks Sean for your response. !!
>>>
>>>
>>>
>>> Want to add some more background here.
>>>
>>>
>>>
>>> I am using Spark3.0+ version with Tensorflow 2.0+.
>>>
>>> My use case is not for the image data but for the Time-series data where
>>> I am using LSTM and transformers to forecast.
>>>
>>>
>>>
>>> I evaluated *SparkFlow* and *spark_tensorflow_distributor *libraries, and
>>> there has been no major development recently on those libraries. I faced
>>> the issue of version dependencies on those and had a hard time fixing the
>>> library compatibilities. Hence a couple of below doubts:-
>>>
>>>
>>>
>>>- Does *Horovod* have any dependencies?
>>>- Any other library which is suitable for my use case.?
>>>- Any example code would really be of great help to understand.
>>>
>>>
>>>
>>> Thanks,
>>>
>>> Vijayant
>>>
>>>
>>>
>>> *From:* Sean Owen 
>>> *Sent:* Wednesday, February 23, 2022 8:40 AM
>>> *To:* Vijayant Kumar 
>>> *Cc:* user @spark 
>>> *Subject:* [E] COMMERCIAL BULK: Re: TensorFlow on Spark
>>>
>>>
>>>
>>> *Email is from a Free Mail Service (Gmail/Yahoo/Hotmail….) *: Beware of
>>> Phishing Scams, Report questionable emails to s...@mavenir.com
>>>
>>> Sure, Horovod is commonly used on Spark for this:
>>>
>>> https://horovod.readthedocs.io/en/stable/spark_include.html
>>>
>>>
>>>
>>> On Tue, Feb 22, 2022 at 8:51 PM Vijayant Kumar <
>>> vijayant.ku...@mavenir.com.invalid> wrote:
>>>
>>> Hi All,
>>>
>>>
>>>
>>> Anyone using Apache spark with TensorFlow for building models. My
>>> requirement is to use TensorFlow distributed model training across the
>>> Spark executors.
>>>
>>> Please help me with some resources or some sample code.
>>>
>>>
>>>
>>> Thanks,
>>>
>>> Vijayant
>>> --
>>>
>>> This e-mail message may contain confidential or proprietary information
>>> of Mavenir Systems, Inc. or its affiliates and is intended solely for the
>>> use of the intended recipient(s). If you are not the intended recipient of
>>> this message, you are hereby notified that any review, use or distribution
>>> of this information is absolutely prohibited and we request that you delete
>>> all copies in your control and contact us by e-mailing to
>>> secur...@mavenir.com. This message contains the views of its author and
>>> may not necessarily reflect the views of Mavenir Systems, Inc. or its
>>> affiliates, who employ systems to monitor email messages, but make no
>>> representation that such messages are authorized, secure, uncompromised, or
>>> free from computer viruses, malware, or other defects. Thank You
>>>
>>> --
>>>
>>> This e-mail message may contain confidential or proprietary information
>>> of Mavenir Systems, Inc. or its affiliates and is intended solely for the
>>> use of the intended recipient(s). If you are not the intended recipient of
>>> this message, you are hereby notified that any review, use or distribution
>>> of this information is absolutely prohibited and we request that you delete
>>> all copies in your control and contact us by e-mailing to
>>> secur...@mavenir.com. This message contains the views of its author and
>>> may not necessarily reflect the views of Mavenir Systems, Inc. or its
>>> affiliates, who employ systems to monitor email messages, but make no
>>> representation that such messages are authorized, secure, uncompromised, or
>>> free from computer viruses, malware, or other defects. Thank You
>>>
>>


Re: [E] COMMERCIAL BULK: Re: TensorFlow on Spark

2022-02-22 Thread Sean Owen
Dependencies? Sure like any python library.  What are you asking about
there?

I don't know of a modern alternative on Spark.

Did you read the docs or search? Plenty of examples

On Tue, Feb 22, 2022, 9:27 PM Vijayant Kumar 
wrote:

> Thanks Sean for your response. !!
>
>
>
> Want to add some more background here.
>
>
>
> I am using Spark3.0+ version with Tensorflow 2.0+.
>
> My use case is not for the image data but for the Time-series data where I
> am using LSTM and transformers to forecast.
>
>
>
> I evaluated *SparkFlow* and *spark_tensorflow_distributor *libraries, and
> there has been no major development recently on those libraries. I faced
> the issue of version dependencies on those and had a hard time fixing the
> library compatibilities. Hence a couple of below doubts:-
>
>
>
>- Does *Horovod* have any dependencies?
>- Any other library which is suitable for my use case.?
>- Any example code would really be of great help to understand.
>
>
>
> Thanks,
>
> Vijayant
>
>
>
> *From:* Sean Owen 
> *Sent:* Wednesday, February 23, 2022 8:40 AM
> *To:* Vijayant Kumar 
> *Cc:* user @spark 
> *Subject:* [E] COMMERCIAL BULK: Re: TensorFlow on Spark
>
>
>
> *Email is from a Free Mail Service (Gmail/Yahoo/Hotmail….) *: Beware of
> Phishing Scams, Report questionable emails to s...@mavenir.com
>
> Sure, Horovod is commonly used on Spark for this:
>
> https://horovod.readthedocs.io/en/stable/spark_include.html
>
>
>
> On Tue, Feb 22, 2022 at 8:51 PM Vijayant Kumar <
> vijayant.ku...@mavenir.com.invalid> wrote:
>
> Hi All,
>
>
>
> Anyone using Apache spark with TensorFlow for building models. My
> requirement is to use TensorFlow distributed model training across the
> Spark executors.
>
> Please help me with some resources or some sample code.
>
>
>
> Thanks,
>
> Vijayant
> --
>
> This e-mail message may contain confidential or proprietary information of
> Mavenir Systems, Inc. or its affiliates and is intended solely for the use
> of the intended recipient(s). If you are not the intended recipient of this
> message, you are hereby notified that any review, use or distribution of
> this information is absolutely prohibited and we request that you delete
> all copies in your control and contact us by e-mailing to
> secur...@mavenir.com. This message contains the views of its author and
> may not necessarily reflect the views of Mavenir Systems, Inc. or its
> affiliates, who employ systems to monitor email messages, but make no
> representation that such messages are authorized, secure, uncompromised, or
> free from computer viruses, malware, or other defects. Thank You
>
> --
>
> This e-mail message may contain confidential or proprietary information of
> Mavenir Systems, Inc. or its affiliates and is intended solely for the use
> of the intended recipient(s). If you are not the intended recipient of this
> message, you are hereby notified that any review, use or distribution of
> this information is absolutely prohibited and we request that you delete
> all copies in your control and contact us by e-mailing to
> secur...@mavenir.com. This message contains the views of its author and
> may not necessarily reflect the views of Mavenir Systems, Inc. or its
> affiliates, who employ systems to monitor email messages, but make no
> representation that such messages are authorized, secure, uncompromised, or
> free from computer viruses, malware, or other defects. Thank You
>


Re: TensorFlow on Spark

2022-02-22 Thread Sean Owen
Sure, Horovod is commonly used on Spark for this:
https://horovod.readthedocs.io/en/stable/spark_include.html

On Tue, Feb 22, 2022 at 8:51 PM Vijayant Kumar
 wrote:

> Hi All,
>
>
>
> Anyone using Apache spark with TensorFlow for building models. My
> requirement is to use TensorFlow distributed model training across the
> Spark executors.
>
> Please help me with some resources or some sample code.
>
>
>
> Thanks,
>
> Vijayant
> --
>
> This e-mail message may contain confidential or proprietary information of
> Mavenir Systems, Inc. or its affiliates and is intended solely for the use
> of the intended recipient(s). If you are not the intended recipient of this
> message, you are hereby notified that any review, use or distribution of
> this information is absolutely prohibited and we request that you delete
> all copies in your control and contact us by e-mailing to
> secur...@mavenir.com. This message contains the views of its author and
> may not necessarily reflect the views of Mavenir Systems, Inc. or its
> affiliates, who employ systems to monitor email messages, but make no
> representation that such messages are authorized, secure, uncompromised, or
> free from computer viruses, malware, or other defects. Thank You
>


Re: Need to make WHERE clause compulsory in Spark SQL

2022-02-22 Thread Sean Owen
Spark does not use Hive for execution, so Hive params will not have an
effect. I don't think you can enforce that in Spark. Typically you enforce
things like that at a layer above your SQL engine, or can do so, because
there is probably other access you need to lock down.

On Tue, Feb 22, 2022 at 6:35 AM Saurabh Gulati
 wrote:

> Hello,
> We are trying to setup Spark as the execution engine for exposing our data
> stored in lake. We have hive metastore running along with Spark thrift
> server and are using Superset as the UI.
>
> We save all tables as External tables in hive metastore with storge being
> on Cloud.
>
> We see that right now when users run a query in Superset SQL Lab it scans
> the whole table. What we want is to limit the data scan by setting
> something like hive.mapred.mode=strict​ in spark, so that user gets an
> exception if they don't specify a partition column.
>
> We tried setting spark.hadoop.hive.mapred.mode=strict ​in
> spark-defaults.conf​ in thrift server  but it still scans the whole table.
> Also tried setting hive.mapred.mode=strict​ in hive-defaults.conf for
> metastore container.
>
> We use Spark 3.2 with hive-metastore version 3.1.2
>
> Is there a way in spark settings to make it happen.
>
>
> TIA
> Saurabh
>


Re: Question about spark.sql min_by

2022-02-21 Thread Sean Owen
>From the source code, looks like this function was added to pyspark in
Spark 3.3, up for release soon. It exists in SQL. You can still use it in
SQL with `spark.sql(...)` in Python though, not hard.

On Mon, Feb 21, 2022 at 4:01 AM David Diebold 
wrote:

> Hello all,
>
> I'm trying to use the spark.sql min_by aggregation function with pyspark.
> I'm relying on this distribution of spark : spark-3.2.1-bin-hadoop3.2
>
> I have a dataframe made of these columns:
> - productId : int
> - sellerId : int
> - price : double
>
> For each product, I want to get the seller who sells the product for the
> cheapest price.
>
> Naive approach would be to do this, but I would expect two shuffles:
>
> import spark.sql.functions as F
> cheapest_prices_df  =
> df.groupby('productId').agg(F.min('price').alias('price'))
> cheapest_sellers_df = df.join(cheapest_prices_df, on=['productId',
> 'price'])
>
> I would had loved to do this instead :
>
> import spark.sql.functions as F
> cheapest_sellers_df  = df.groupby('productId').agg(F.min('price'),
> F.min_by('sellerId', 'price'))
>
> Unfortunately min_by does not seem available in pyspark sql functions,
> whereas I can see it in the doc :
> https://spark.apache.org/docs/latest/api/sql/index.html
>
> I have managed to use min_by with this approach but it looks slow (maybe
> because of temp table creation ?):
>
> df.createOrReplaceTempView("table")
> cheapest_sellers_df = spark.sql("select min_by(sellerId, price) sellerId,
> min(price) from table group by productId")
>
> Is there a way I can rely on min_by directly in groupby ?
> Is there some code missing in pyspark wrapper to make min_by visible
> somehow ?
>
> Thank you in advance for your help.
>
> Cheers
> David
>


Re: Encoders.STRING() causing performance problems in Java application

2022-02-21 Thread Sean Owen
Oh, yes of course. If you run an entire distributed Spark job for one row,
over and over, that's much slower. It would make much more sense to run the
whole data set at once - the point is parallelism here.

On Mon, Feb 21, 2022 at 2:36 AM  wrote:

> Thanks a lot, Sean, for the comments. I realize I didn't provide enough
> background information to properly diagnose this issue.
>
> In the meantime, I have created some test cases for isolating the problem
> and running some specific performance tests. The numbers are quite
> revealing: Running our Spark model individually on Strings takes about 8
> Sec for the test data, whereas is take 88 ms when run on the entire data in
> a single Dataset. This is a factor of 100x. This gets even worse for larger
> datasets.
>
> So, the root cause here is the way the Spark model is being called for one
> string at a time by the self-built prediction pipeline (which is also using
> other ML techniques apart from Spark). Needs some re-factoring...
>
> Thanks again for the help.
>
> Cheers,
>
> Martin
>
>
> Am 2022-02-18 13:41, schrieb Sean Owen:
>
> That doesn't make a lot of sense. Are you profiling the driver, rather
> than executors where the work occurs?
> Is your data set quite small such that small overheads look big?
> Do you even need Spark if your data is not distributed - coming from the
> driver anyway?
>
> The fact that a static final field did anything suggests something is
> amiss with your driver program. Are you perhaps inadvertently serializing
> your containing class with a bunch of other data by using its methods in a
> closure?
> If your data is small it's not surprising that the overhead could be in
> just copying the data around, the two methods you cite, rather than the
> compute.
> Too many things here to really say what's going on.
>
>
> On Fri, Feb 18, 2022 at 12:42 AM  wrote:
>
> Hello,
>
> I am working on optimising the performance of a Java ML/NLP application
> based on Spark / SparkNLP. For prediction, I am applying a trained model on
> a Spark dataset which consists of one column with only one row. The dataset
> is created like this:
>
> List textList = Collections.singletonList(text);
> Dataset data = sparkSession
> .createDataset(textList, Encoders.STRING())
> .withColumnRenamed(COL_VALUE, COL_TEXT);
>
>
> The predictions are created like this:
>
> PipelineModel fittedPipeline = pipeline.fit(dataset);
>
> Dataset prediction = fittedPipeline.transform(dataset);
>
>
> We noticed that the performance isn't quite as good as expected. After
> profiling the application with VisualVM, I noticed that the problem is with
> org.apache.spark.sql.Encoders.STRING() in the creation of the dataset,
> which by itself takes up about 75% of the time for the whole prediction
> method call.
>
> So, is there a simpler and more efficient way of creating the required
> dataset, consisting of one column and one String row?
>
> Thanks a lot.
>
> Cheers,
>
> Martin
>
>


Re: Apache spark 3.0.3 [Spark lower version enhancements]

2022-02-18 Thread Sean Owen
--+
> | | CVE-2019-20330   |
>  || 2.8.11.5, 2.9.10.2 | jackson-databind:
> lacks   |
> | |  |
>  ||| certain
> net.sf.ehcache blocking   |
> | |  |
>  ||| -->
> avd.aquasec.com/nvd/cve-2019-20330 |
> +
> +--+--+
>  
> ++---+
> | | CVE-2018-5968| HIGH
>   || 2.7.9.5, 2.8.11.1, 2.9.4   | jackson-databind:
> unsafe  |
> | |  |
>  ||| deserialization
> due to incomplete |
> | |  |
>  ||| blacklist
> (incomplete fix |
> | |  |
>  ||| for CVE-2017-7525
> and...  |
> | |  |
>  ||| -->
> avd.aquasec.com/nvd/cve-2018-5968  |
> + +--+
>  +
>  
> ++---+
> | | CVE-2020-35490   |
>  || 2.9.10.8   | jackson-databind:
> mishandles the interaction  |
> | |  |
>  ||| between
> serialization gadgets and typing, related to  |
> | |  |
>  |||
> org.apache.commons.dbcp2.datasources.PerUserPoolDataSource... |
> | |  |
>  ||| -->
> avd.aquasec.com/nvd/cve-2020-35490 |
> + +--+
>  ++
>  +---+
> | | CVE-2020-35491   |
>  ||| jackson-databind:
> mishandles the interaction  |
> | |  |
>  ||| between
> serialization gadgets and typing, related to  |
> | |  |
>  |||
> org.apache.commons.dbcp2.datasources.SharedPoolDataSource...  |
> | |  |
>  ||| -->
> avd.aquasec.com/nvd/cve-2020-35491 |
> +
> +--+--+
>  
> ++---+
> | | CVE-2018-1000873 | MEDIUM
>   || 2.9.8  |
> jackson-modules-java8: DoS due|
> | |  |
>  ||| to an Improper
> Input Validation   |
> | |  |
>  ||| -->
> avd.aquasec.com/nvd/cve-2018-1000873   |
> +-+------+
>  
> +++---+
>
>
> Rajesh Krishnamurthy | Enterprise Architect
> T: +1 510-833-7189 | M: +1 925-917-9208
> http://www.perforce.com
> Visit us on: Twitter
> <https://nam12.safelinks.protection.ou

Re: Encoders.STRING() causing performance problems in Java application

2022-02-18 Thread Sean Owen
That doesn't make a lot of sense. Are you profiling the driver, rather than
executors where the work occurs?
Is your data set quite small such that small overheads look big?
Do you even need Spark if your data is not distributed - coming from the
driver anyway?

The fact that a static final field did anything suggests something is amiss
with your driver program. Are you perhaps inadvertently serializing your
containing class with a bunch of other data by using its methods in a
closure?
If your data is small it's not surprising that the overhead could be in
just copying the data around, the two methods you cite, rather than the
compute.
Too many things here to really say what's going on.


On Fri, Feb 18, 2022 at 12:42 AM  wrote:

> Hello,
>
> I am working on optimising the performance of a Java ML/NLP application
> based on Spark / SparkNLP. For prediction, I am applying a trained model on
> a Spark dataset which consists of one column with only one row. The dataset
> is created like this:
>
> List textList = Collections.singletonList(text);
> Dataset data = sparkSession
> .createDataset(textList, Encoders.STRING())
> .withColumnRenamed(COL_VALUE, COL_TEXT);
>
>
> The predictions are created like this:
>
> PipelineModel fittedPipeline = pipeline.fit(dataset);
>
> Dataset prediction = fittedPipeline.transform(dataset);
>
>
> We noticed that the performance isn't quite as good as expected. After
> profiling the application with VisualVM, I noticed that the problem is with
> org.apache.spark.sql.Encoders.STRING() in the creation of the dataset,
> which by itself takes up about 75% of the time for the whole prediction
> method call.
>
> So, is there a simpler and more efficient way of creating the required
> dataset, consisting of one column and one String row?
>
> Thanks a lot.
>
> Cheers,
>
> Martin
>


Re: Implementing circuit breaker pattern in Spark

2022-02-16 Thread Sean Owen
There's nothing wrong with calling microservices this way. Something needs
to call the service with all the data arriving, and Spark is fine for
executing arbitrary logic including this kind of thing.
Kafka does not change that?

On Wed, Feb 16, 2022 at 9:24 AM Gourav Sengupta 
wrote:

> Hi,
> once again, just trying to understand the problem first.
>
> Why are we using SPARK to place calls to micro services? There are several
> reasons why this should never happen, including costs/ security/
> scalability concerns, etc.
>
> Is there a way that you can create a producer and put the data into Kafka
> first?
>
> Sorry, I am not suggesting any solutions, just trying to understand the
> problem first.
>
>
> Regards,
> Gourav
>
>
>
> On Wed, Feb 16, 2022 at 2:36 PM S  wrote:
>
>> No I want the job to stop and end once it discovers on repeated retries
>> that the microservice is not responding. But I think I got where you were
>> going right after sending my previous mail. Basically repeatedly failing of
>> your tasks on retries ultimately fails your job anyway. So thats an
>> in-built circuit breaker. So what that essentially means is we should not
>> be catching those HTTP 5XX exceptions (which we currently do) and let the
>> tasks fail on their own only for spark to retry them for finite number of
>> times and then subsequently fail and thereby break the circuit. Thanks.
>>
>> On Wed, Feb 16, 2022 at 7:59 PM Sean Owen  wrote:
>>
>>> You stop the Spark job by tasks failing repeatedly, that's already how
>>> it works. You can't kill the driver from the executor other ways, but
>>> should not need to. I'm not clear, you're saying you want to stop the job,
>>> but also continue processing?
>>>
>>> On Wed, Feb 16, 2022 at 7:58 AM S  wrote:
>>>
>>>> Retries have been already implemented. The question is how to stop the
>>>> spark job by having an executor JVM send a signal to the driver JVM. e.g. I
>>>> have a microbatch of 30 messages; 10 in each of the 3 partitions. Let's say
>>>> while a partition of 10 messages was being processed, first 3 went through
>>>> but then the microservice went down. Now when the 4th message in the
>>>> partition is sent to the microservice it keeps receiving 5XX on every retry
>>>> e.g. 5 retries. What I now want is to have that task from that executor JVM
>>>> send a signal to the driver JVM to terminate the spark job on the failure
>>>> of the 5th retry. Currently, what we have in place is retrying it 5 times
>>>> and then upon failure i.e. 5XX catch the exception and move the message to
>>>> a DLQ thereby having the flatmap produce a *None* and proceed to the
>>>> next message in the partition of that microbatch. This approach keeps the
>>>> pipeline alive and keeps pushing messages to DLQ microbatch after
>>>> microbatch until the microservice is back up.
>>>>
>>>>
>>>> On Wed, Feb 16, 2022 at 6:50 PM Sean Owen  wrote:
>>>>
>>>>> You could use the same pattern in your flatMap function. If you want
>>>>> Spark to keep retrying though, you don't need any special logic, that is
>>>>> what it would do already. You could increase the number of task retries
>>>>> though; see the spark.excludeOnFailure.task.* configurations.
>>>>>
>>>>> You can just implement the circuit breaker pattern directly too,
>>>>> nothing special there, though I don't think that's what you want? you
>>>>> actually want to retry the failed attempts, not just avoid calling the
>>>>> microservice.
>>>>>
>>>>> On Wed, Feb 16, 2022 at 3:18 AM S  wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> We have a spark job that calls a microservice in the lambda function
>>>>>> of the flatmap transformation  -> passes to this microservice, the 
>>>>>> inbound
>>>>>> element in the lambda function and returns the transformed value or 
>>>>>> "None"
>>>>>> from the microservice as an output of this flatMap transform. Of course 
>>>>>> the
>>>>>> lambda also takes care of exceptions from the microservice etc.. The
>>>>>> question is: there are times when the microservice may be down and there 
>>>>>> is
>>>>>> no point recording an exception and putting the message in the DLQ for
>>>>>> every elemen

Re: Implementing circuit breaker pattern in Spark

2022-02-16 Thread Sean Owen
You stop the Spark job by tasks failing repeatedly, that's already how
it works. You can't kill the driver from the executor other ways, but
should not need to. I'm not clear, you're saying you want to stop the job,
but also continue processing?

On Wed, Feb 16, 2022 at 7:58 AM S  wrote:

> Retries have been already implemented. The question is how to stop the
> spark job by having an executor JVM send a signal to the driver JVM. e.g. I
> have a microbatch of 30 messages; 10 in each of the 3 partitions. Let's say
> while a partition of 10 messages was being processed, first 3 went through
> but then the microservice went down. Now when the 4th message in the
> partition is sent to the microservice it keeps receiving 5XX on every retry
> e.g. 5 retries. What I now want is to have that task from that executor JVM
> send a signal to the driver JVM to terminate the spark job on the failure
> of the 5th retry. Currently, what we have in place is retrying it 5 times
> and then upon failure i.e. 5XX catch the exception and move the message to
> a DLQ thereby having the flatmap produce a *None* and proceed to the next
> message in the partition of that microbatch. This approach keeps the
> pipeline alive and keeps pushing messages to DLQ microbatch after
> microbatch until the microservice is back up.
>
>
> On Wed, Feb 16, 2022 at 6:50 PM Sean Owen  wrote:
>
>> You could use the same pattern in your flatMap function. If you want
>> Spark to keep retrying though, you don't need any special logic, that is
>> what it would do already. You could increase the number of task retries
>> though; see the spark.excludeOnFailure.task.* configurations.
>>
>> You can just implement the circuit breaker pattern directly too, nothing
>> special there, though I don't think that's what you want? you actually want
>> to retry the failed attempts, not just avoid calling the microservice.
>>
>> On Wed, Feb 16, 2022 at 3:18 AM S  wrote:
>>
>>> Hi,
>>>
>>> We have a spark job that calls a microservice in the lambda function of
>>> the flatmap transformation  -> passes to this microservice, the inbound
>>> element in the lambda function and returns the transformed value or "None"
>>> from the microservice as an output of this flatMap transform. Of course the
>>> lambda also takes care of exceptions from the microservice etc.. The
>>> question is: there are times when the microservice may be down and there is
>>> no point recording an exception and putting the message in the DLQ for
>>> every element in our streaming pipeline so long as the microservice stays
>>> down. Instead we want to be able to do is retry the microservice call for a
>>> given event for a predefined no. of times and if found to be down then
>>> terminate the spark job so that this current microbatch is terminated and
>>> there is no next microbatch and the rest of the messages continue therefore
>>> continue to be in the source kafka topics unpolled and therefore
>>> unprocesseed.  until the microservice is back up and the spark job is
>>> redeployed again. In regular microservices, we can implement this using the
>>> Circuit breaker pattern. In Spark jobs however this would mean, being able
>>> to somehow send a signal from an executor JVM to the driver JVM to
>>> terminate the Spark job. Is there a way to do that in Spark?
>>>
>>> P.S.:
>>> - Having the circuit breaker functionality helps specificize the purpose
>>> of the DLQ to data or schema issues only instead of infra/network related
>>> issues.
>>> - As far as the need for the Spark job to use microservices is
>>> concerned, think of it as a complex logic being maintained in a
>>> microservice that does not warrant duplication.
>>> - checkpointing is being taken care of manually and not using spark's
>>> default checkpointing mechanism.
>>>
>>> Regards,
>>> Sheel
>>>
>>
>
> --
>
> Best Regards,
>
> Sheel Pancholi
>


Re: Implementing circuit breaker pattern in Spark

2022-02-16 Thread Sean Owen
You could use the same pattern in your flatMap function. If you want Spark
to keep retrying though, you don't need any special logic, that is what it
would do already. You could increase the number of task retries though; see
the spark.excludeOnFailure.task.* configurations.

You can just implement the circuit breaker pattern directly too, nothing
special there, though I don't think that's what you want? you actually want
to retry the failed attempts, not just avoid calling the microservice.

On Wed, Feb 16, 2022 at 3:18 AM S  wrote:

> Hi,
>
> We have a spark job that calls a microservice in the lambda function of
> the flatmap transformation  -> passes to this microservice, the inbound
> element in the lambda function and returns the transformed value or "None"
> from the microservice as an output of this flatMap transform. Of course the
> lambda also takes care of exceptions from the microservice etc.. The
> question is: there are times when the microservice may be down and there is
> no point recording an exception and putting the message in the DLQ for
> every element in our streaming pipeline so long as the microservice stays
> down. Instead we want to be able to do is retry the microservice call for a
> given event for a predefined no. of times and if found to be down then
> terminate the spark job so that this current microbatch is terminated and
> there is no next microbatch and the rest of the messages continue therefore
> continue to be in the source kafka topics unpolled and therefore
> unprocesseed.  until the microservice is back up and the spark job is
> redeployed again. In regular microservices, we can implement this using the
> Circuit breaker pattern. In Spark jobs however this would mean, being able
> to somehow send a signal from an executor JVM to the driver JVM to
> terminate the Spark job. Is there a way to do that in Spark?
>
> P.S.:
> - Having the circuit breaker functionality helps specificize the purpose
> of the DLQ to data or schema issues only instead of infra/network related
> issues.
> - As far as the need for the Spark job to use microservices is concerned,
> think of it as a complex logic being maintained in a microservice that does
> not warrant duplication.
> - checkpointing is being taken care of manually and not using spark's
> default checkpointing mechanism.
>
> Regards,
> Sheel
>


<    1   2   3   4   5   6   7   8   9   10   >