Re: [Spark SQL, intermediate+] possible bug or weird behavior of insertInto

2021-03-04 Thread Jeff Evans
Why not perform a df.select(...) before the final write to ensure a
consistent ordering.

On Thu, Mar 4, 2021, 7:39 AM Oldrich Vlasic 
wrote:

> Thanks for reply! Is there something to be done, setting a config property
> for example? I'd like to prevent users (mainly data scientists) from
> falling victim to this.
> --
> *From:* Russell Spitzer 
> *Sent:* Wednesday, March 3, 2021 3:31 PM
> *To:* Sean Owen 
> *Cc:* Oldrich Vlasic ; user <
> user@spark.apache.org>; Ondřej Havlíček 
> *Subject:* Re: [Spark SQL, intermediate+] possible bug or weird behavior
> of insertInto
>
> Yep this is the behavior for Insert Into, using the other write apis does
> schema matching I believe.
>
> On Mar 3, 2021, at 8:29 AM, Sean Owen  wrote:
>
> I don't have any good answer here, but, I seem to recall that this is
> because of SQL semantics, which follows column ordering not naming when
> performing operations like this. It may well be as intended.
>
> On Tue, Mar 2, 2021 at 6:10 AM Oldrich Vlasic <
> oldrich.vla...@datasentics.com> wrote:
>
> Hi,
>
> I have encountered a weird and potentially dangerous behaviour of Spark
> concerning
> partial overwrites of partitioned data. Not sure if this is a bug or just
> abstraction
> leak. I have checked Spark section of Stack Overflow and haven't found any
> relevant
> questions or answers.
>
> Full minimal working example provided as attachment. Tested on Databricks
> runtime 7.3 LTS
> ML (Spark 3.0.1). Short summary:
>
> Write dataframe using partitioning by a column using saveAsTable. Filter
> out part of the
> dataframe, change some values (simulates new increment of data) and write
> again,
> overwriting a subset of partitions using insertInto. This operation will
> either fail on
> schema mismatch or cause data corruption.
>
> Reason: on the first write, the ordering of the columns is changed
> (partition column is
> placed at the end). On the second write this is not taken into
> consideration and Spark
> tries to insert values into the columns based on their order and not on
> their name. If
> they have different types this will fail. If not, values will be written
> to incorrect
> columns causing data corruption.
>
> My question: is this a bug or intended behaviour? Can something be done
> about it to prevent
> it? This issue can be avoided by doing a select with schema loaded from
> the target table.
> However, when user is not aware this could cause hard to track down errors
> in data.
>
> Best regards,
> Oldřich Vlašic
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>
>


Re: how to serve data over JDBC using simplest setup

2021-02-18 Thread Jeff Evans
It sounds like the tool you're after, then, is a distributed SQL engine
like Presto.  But I could be totally misunderstanding what you're trying to
do.

On Thu, Feb 18, 2021 at 1:48 PM Scott Ribe 
wrote:

> I have a client side piece that needs access via JDBC.
>
> > On Feb 18, 2021, at 12:45 PM, Jeff Evans 
> wrote:
> >
> > If the data is already in Parquet files, I don't see any reason to
> involve JDBC at all.  You can read Parquet files directly into a
> DataFrame.
> https://spark.apache.org/docs/latest/sql-data-sources-parquet.html
>
>


Re: how to serve data over JDBC using simplest setup

2021-02-18 Thread Jeff Evans
If the data is already in Parquet files, I don't see any reason to involve
JDBC at all.  You can read Parquet files directly into a DataFrame.
https://spark.apache.org/docs/latest/sql-data-sources-parquet.html

On Thu, Feb 18, 2021 at 1:42 PM Scott Ribe 
wrote:

> I need a little help figuring out how some pieces fit together. I have
> some tables in parquet files, and I want to access them using SQL over
> JDBC. I gather that I need to run the thrift server, but how do I configure
> it to load my files into datasets and expose views?
>
> The context is this: trying to figure out if we want to use Spark for
> historical data, and so far, just using spark shell for some experiments:
>
> - I have established that we can easily export to Parquet and it is very
> efficient at storing this data
> - Spark SQL queries the data with reasonable performance
>
> Now I am at the step of testing whether the client-side that we are
> considering can deal effectively with querying the volume of data.
>
> Which is why I'm looking for the simplest setup. If the client integration
> works, then yes we move on to configuring a proper cluster. (And it is a
> real question, I've already had one potential client-side piece be totally
> incompetent at handling a decent volume of data...)
>
> (The environment I am working in is just the straight download of
> spark-3.0.1-bin-hadoop3.2)
>
> --
> Scott Ribe
> scott_r...@elevated-dev.com
> https://www.linkedin.com/in/scottribe/
>
>
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: How to modify a field in a nested struct using pyspark

2021-01-29 Thread Jeff Evans
If you need to do this in 2.x, this library does the trick:
https://github.com/fqaiser94/mse

On Fri, Jan 29, 2021 at 12:15 PM Adam Binford  wrote:

> I think they're voting on the next release candidate starting sometime
> next week. So hopefully barring any other major hurdles within the next few
> weeks.
>
> On Fri, Jan 29, 2021, 1:01 PM Felix Kizhakkel Jose <
> felixkizhakkelj...@gmail.com> wrote:
>
>> Wow, that's really great to know. Thank you so much Adam. Do you know
>> when the 3.1 release is scheduled?
>>
>> Regards,
>> Felix K Jose
>>
>> On Fri, Jan 29, 2021 at 12:35 PM Adam Binford  wrote:
>>
>>> As of 3.0, the only way to do it is something that will recreate the
>>> whole struct:
>>> df.withColumn('timingPeriod',
>>> f.struct(f.col('timingPeriod.start').cast('timestamp').alias('start'),
>>> f.col('timingPeriod.end').cast('timestamp').alias('end')))
>>>
>>> There's a new method coming in 3.1 on the column class called withField
>>> which was designed for this purpose. I backported it to my personal 3.0
>>> build because of how useful it is. It works something like:
>>> df.withColumn('timingPeriod', f.col('timingPeriod').withField('start',
>>> f.col('timingPeriod.start').cast('timestamp')).withField('end',
>>> f.col('timingPeriod.end')))
>>>
>>> And it works on multiple levels of nesting which is nice.
>>>
>>> On Fri, Jan 29, 2021 at 11:32 AM Felix Kizhakkel Jose <
>>> felixkizhakkelj...@gmail.com> wrote:
>>>
 Hello All,

 I am using pyspark structured streaming and I am getting timestamp
 fields as plain long (milliseconds), so I have to modify these fields into
 a timestamp type

 a sample json object object:

 {
   "id":{
   "value": "f40b2e22-4003-4d90-afd3-557bc013b05e",
   "type": "UUID",
   "system": "Test"
 },
   "status": "Active",
   "timingPeriod": {
 "startDateTime": 1611859271516,
 "endDateTime": null
   },
   "eventDateTime": 1611859272122,
   "isPrimary": true,
 }

   Here I want to convert "eventDateTime" and "startDateTime" and
 "endDateTime" as timestamp types

 So I have done following,

 def transform_date_col(date_col):
 return f.when(f.col(date_col).isNotNull(), f.col(date_col) / 1000)

 df.withColumn(
 "eventDateTime", 
 transform_date_col("eventDateTime").cast("timestamp")).withColumn(
 "timingPeriod.start", 
 transform_date_col("timingPeriod.start").cast("timestamp")).withColumn(
 "timingPeriod.end", 
 transform_date_col("timingPeriod.end").cast("timestamp"))

 the timingPeriod fields are not a struct anymore rather they become two
 different fields with names "timingPeriod.start", "timingPeriod.end".

 How can I get them as a struct as before?
 Is there a generic way I can modify a single/multiple properties of
 nested structs?

 I have hundreds of entities where the long needs to convert to
 timestamp, so a generic implementation will help my data ingestion pipeline
 a lot.

 Regards,
 Felix K Jose


>>>
>>> --
>>> Adam Binford
>>>
>>


Re: unsubscribe

2020-12-16 Thread Jeff Evans
https://gist.github.com/jeff303/ba1906bb7bcb2f2501528a8bb1521b8e

On Wed, Dec 16, 2020, 6:45 AM 张洪斌  wrote:

> how to unsubscribe this ?
>
> 发自网易邮箱大师
> 在2020年12月16日 20:43,张洪斌
>  写道:
>
>
> unsubscribe
> 学生张洪斌
> 邮箱:hongbinzh...@163.com
>
> 
>
> 签名由 网易邮箱大师  定制
>
>


Re: Unsubscribe

2020-12-09 Thread Jeff Evans
That's not how to unsubscribe.
https://gist.github.com/jeff303/ba1906bb7bcb2f2501528a8bb1521b8e

On Wed, Dec 9, 2020 at 9:26 AM Bhavya Jain  wrote:

> unsubscribe
>


Re: Running the driver on a laptop but data is on the Spark server

2020-11-25 Thread Jeff Evans
In your situation, I'd try to do one of the following (in decreasing order
of personal preference)

   1. Restructure things so that you can operate on a local data file, at
   least for the purpose of developing your driver logic.  Don't rely on the
   Metastore or HDFS until you have to.  Structure the application logic so it
   operates on a DataFrame (or Dataset) and doesn't care where it came
   from.  Build this data file from your real data (probably a small subset).
   2. Develop the logic using spark-shell running on a cluster node, since
   the environment will be all set up already (which, of course, you already
   mentioned).
   3. Set up remote debugging of the driver, open an SSH tunnel to the
   node, and connect from your local laptop to debug/iterate.  Figure out the
   fastest way to rebuild the jar and scp it up to try again.


On Wed, Nov 25, 2020 at 9:35 AM Ryan Victory  wrote:

> A key part of what I'm trying to do involves NOT having to bring the data
> "through" the driver in order to get the cluster to work on it (which would
> involve a network hop from server to laptop and another from laptop to
> server). I'd rather have the data stay on the server and the driver stay on
> my laptop if possible, but I'm guessing the Spark APIs/topology wasn't
> designed that way.
>
> What I was hoping for was some way to be able to say val df =
> spark.sql("SELECT * FROM parquet.`*local://*/opt/data/transactions.parquet`")
> or similar to convince Spark to not move the data. I'd imagine if I used
> HDFS, data locality would kick in anyways to prevent the network shuffles
> between the driver and the cluster, but even then I wonder (based on what
> you guys are saying) if I'm wrong.
>
> Perhaps I'll just have to modify the workflow to move the JAR to the
> server and execute it from there. This isn't ideal but it's better than
> nothing.
>
> -Ryan
>
> On Wed, Nov 25, 2020 at 9:13 AM Chris Coutinho 
> wrote:
>
>> I'm also curious if this is possible, so while I can't offer a solution
>> maybe you could try the following.
>>
>> The driver and executor nodes need to have access to the same
>> (distributed) file system, so you could try to mount the file system to
>> your laptop, locally, and then try to submit jobs and/or use the
>> spark-shell while connected to the same system.
>>
>> A quick google search led me to find this article where someone shows how
>> to mount an HDFS locally. It appears that Cloudera supports some kind of
>> FUSE-based library, which may be useful for your use-case.
>>
>> https://idata.co.il/2018/10/how-to-connect-hdfs-to-local-filesystem/
>>
>> On Wed, 2020-11-25 at 08:51 -0600, Ryan Victory wrote:
>>
>> Hello!
>>
>> I have been tearing my hair out trying to solve this problem. Here is my
>> setup:
>>
>> 1. I have Spark running on a server in standalone mode with data on the
>> filesystem of the server itself (/opt/data/).
>> 2. I have an instance of a Hive Metastore server running (backed by
>> MariaDB) on the same server
>> 3. I have a laptop where I am developing my spark jobs (Scala)
>>
>> I have configured Spark to use the metastore and set the warehouse
>> directory to be in /opt/data/warehouse/. What I am trying to accomplish are
>> a couple of things:
>>
>> 1. I am trying to submit Spark jobs (via JARs) using spark-submit, but
>> have the driver run on my local machine (my laptop). I want the jobs to use
>> the data ON THE SERVER and not try to reference it from my local machine.
>> If I do something like this:
>>
>> val df = spark.sql("SELECT * FROM
>> parquet.`/opt/data/transactions.parquet`")
>>
>> I get an error that the path doesn't exist (because it's trying to find
>> it on my laptop). If I run the same thing in a spark-shell on the spark
>> server itself, there isn't an issue because the driver has access to the
>> data. If I submit the job with submit-mode=cluster then it works too
>> because the driver is on the cluster. I don't want this, I want to get the
>> results on my laptop.
>>
>> How can I force Spark to read the data from the cluster's filesystem and
>> not the driver's?
>>
>> 2. I have setup a Hive Metastore and created a table (in the spark shell
>> on the spark server itself). The data in the warehouse is in the local
>> filesystem. When I create a spark application JAR and try to run it from my
>> laptop, I get the same problem as #1, namely that it tries to find the
>> warehouse directory on my laptop itself.
>>
>> Am I crazy? Perhaps this isn't a supported way to use Spark? Any help or
>> insights are much appreciated!
>>
>> -Ryan Victory
>>
>>
>>


Re: Spark as computing engine vs spark cluster

2020-10-12 Thread Jeff Evans
Spark is a computation engine that runs on a set of distributed nodes.  You
must "bring your own" hardware, although of course there are hosted
solutions available.

On Sat, Oct 10, 2020 at 9:24 AM Santosh74  wrote:

> Is spark compute engine only or it's also cluster which comes with set of
> hardware /nodes  ? What exactly is spark clusterr?
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Distribute entire columns to executors

2020-09-24 Thread Jeff Evans
I think you can just select the columns you need into new DataFrames, then
process those separately.

val dfFirstTwo = ds.select("Col1", "Col2")
# do whatever with this one
dfFirstTwo.sort(...)
# similar for the next two columns
val dfNextTwo = ds.select("Col3", "Col4")
dfNextTwo.sort(...)

These should result in separate tasks, which you could confirm by checking
the Spark UI when the application is submitted.

On Thu, Sep 24, 2020 at 7:01 AM Pedro Cardoso 
wrote:

> Hello,
>
> Is it possible in Spark to map partitions such that partitions are
> column-based and not row-based?
> My use-case is to compute temporal series of numerical values.
> I.e: Exponential moving averages over the values of a given dataset's
> column.
>
> Suppose there is a dataset with roughly 200 columns, a high percentage of
> which are numerical (> 60%) and at least one timestamp column, as shown in
> the attached file.
>
> I want to shuffle data to executors such that each executor has a smaller
> dataset with only 2 columns, [Col0: Timestamp, Col: Numerical type].
> Over which I can then sort the dataset by increasing timestamp and then
> iterate over the rows with a custom function which receives a tuple:
> {timestamp; value}.
>
> Partitoning by column value does not make sense for me since there is a
> temporal lineage of values which I must keep. On the other hand I would
> like to parallelize this workload as my datasets can be quite big (> 2
> billion rows). The only way I see how is to distribute the entire columns
> so that each executor has 2B timestamp + numerical values rather than
> 2B*size of an entire row.
>
> Is this possible in Spark? Can someone point in the right direction? A
> code snippet example (not working is fine if the logic is sound) would be
> highly appreciated!
>
> Thank you for your time.
> --
>
> *Pedro Cardoso*
>
> *Research Engineer*
>
> pedro.card...@feedzai.com
>
>
> [image: Follow Feedzai on Facebook.] 
> [image:
> Follow Feedzai on Twitter!] [image: Connect
> with Feedzai on LinkedIn!] 
>
>
> [image: Feedzai best in class aite report]
> 
>
> *The content of this email is confidential and intended for the recipient
> specified in message only. It is strictly prohibited to share any part of
> this message with any third party, without a written consent of the sender.
> If you received this message by mistake, please reply to this message and
> follow with its deletion, so that we can ensure such a mistake does not
> occur in the future.*
>
> *The content of this email is confidential and intended for the recipient
> specified in message only. It is strictly prohibited to share any part of
> this message with any third party, without a written consent of the sender.
> If you received this message by mistake, please reply to this message and
> follow with its deletion, so that we can ensure such a mistake does not
> occur in the future.*
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Re: Unsubscribe

2020-08-26 Thread Jeff Evans
That is not how you unsubscribe.  See here for instructions:
https://gist.github.com/jeff303/ba1906bb7bcb2f2501528a8bb1521b8e

On Wed, Aug 26, 2020, 4:22 PM Annabel Melongo
 wrote:

> Please remove me from the mailing list
>


Re: Garbage collection issue

2020-07-20 Thread Jeff Evans
What is your heap size, and JVM vendor/version?  Generally, G1 only
outperforms CMS on large heap sizes (ex: 31GB or larger).

On Mon, Jul 20, 2020 at 1:22 PM Amit Sharma  wrote:

> Please help on this.
>
>
> Thanks
> Amit
>
> On Fri, Jul 17, 2020 at 2:34 PM Amit Sharma  wrote:
>
>> Hi All, i am running the same batch job in my two separate spark
>> clusters. In one of the clusters it is showing GC warning  on spark -ui
>> under executer tag. Garbage collection is taking longer time around 20 %
>> while in another cluster it is under 10 %. I am using the same
>> configuration in my spark submit and using G1GC .
>>
>> Please let me know what could be the reason for GC slowness.
>>
>>
>> Thanks
>> Amit
>>
>


Re: Using spark.jars conf to override jars present in spark default classpath

2020-07-16 Thread Jeff Evans
If you can't avoid it, you need to make use of the
spark.driver.userClassPathFirst and/or spark.executor.userClassPathFirst
properties.

On Thu, Jul 16, 2020 at 2:03 PM Russell Spitzer 
wrote:

> I believe the main issue here is that spark.jars is a bit "too late" to
> actually prepend things to the class path. For most use cases this value is
> not read until after the JVM has already started and the system classloader
> has already loaded.
>
> The jar argument gets added via the dynamic class loader so it necessarily
> has to come after wards :/ Driver extra classpath and it's friends, modify
> the actual launch command of the driver (or executors) so they can prepend
> whenever they want.
>
>  In general you do not want to have conflicting jars at all if possible
> and I would recommend looking into shading if it's really important for
> your application to use a specific incompatible version of a library. Jar
> (and extraClasspath) are really just
> for adding additional jars and I personally would try not to rely on
> classpath ordering to get the right libraries recognized.
>
> On Thu, Jul 16, 2020 at 1:55 PM Nupur Shukla 
> wrote:
>
>> Hello,
>>
>> How can we use *spark.jars* to to specify conflicting jars (that is,
>> jars that are already present in the spark's default classpath)? Jars
>> specified in this conf gets "appended" to the classpath, and thus gets
>> looked at after the default classpath. Is it not intended to be used to
>> specify conflicting jars?
>> Meanwhile when *spark.driver.extraClassPath* conf is specified, this
>> path is "prepended" to the classpath and thus takes precedence over the
>> default classpath.
>>
>> How can I use both to specify different jars and paths but achieve a
>> precedence of spark.jars path > spark.driver.extraClassPath > spark default
>> classpath (left to right precedence order)?
>>
>> Experiment conducted:
>>
>> I am using sample-project.jar which has one class in it SampleProject.
>> This has a method which prints the version number of the jar. For this
>> experiment I am using 3 versions of this sample-project.jar
>> Sample-project-1.0.0.jar is present in the spark default classpath in my
>> test cluster
>> Sample-project-2.0.0.jar is present in folder /home//ClassPathConf
>> on driver
>> Sample-project-3.0.0.jar is present in  folder /home//JarsConf on
>> driver
>>
>> (Empty cell in img below means that conf was not specified)
>>
>> [image: image.png]
>>
>>
>> Thank you,
>> Nupur
>>
>>
>>


Re: Mock spark reads and writes

2020-07-15 Thread Jeff Evans
Why do you need to mock the read/write at all?  Why not have your test CSV
file, and invoke it (which will perform the real Spark DF read of CSV),
write it, and assert on the output?

On Tue, Jul 14, 2020 at 12:19 PM Dark Crusader 
wrote:

> Sorry I wasn't very clear in my last email.
>
> I have a function like this:
>
> def main( read_file):
> df = spark.read.csv(read_file)
> ** Some other code **
> df.write.csv(path)
>
> Which I need to write a unit test for.
> Would pythons unittest mock help me here?
>
> When I googled this, I mostly see that we shouldn't mock these reads and
> writes, but this doesn't solve the problem of how I unittest helper
> functions/main method that will have to read and write files.
>
> An example of the proper way to do this in python would be really helpful.
>
> Thanks a lot.
>


Re: How To Access Hive 2 Through JDBC Using Kerberos

2020-07-09 Thread Jeff Evans
There are various sample JDBC URLs documented here, depending on the driver
vendor, Kerberos (or not), and SSL (or not).  Often times, unsurprisingly,
SSL is used in conjunction with Kerberos.  Even if you don't use StreamSets
software at all, you might find these useful.

https://ask.streamsets.com/question/7/how-do-you-configure-a-hive-impala-jdbc-driver-for-data-collector/?answer=8#post-id-8

On Thu, Jul 9, 2020 at 11:28 AM Daniel de Oliveira Mantovani <
daniel.oliveira.mantov...@gmail.com> wrote:

> One of my colleagues found this solution:
>
>
> https://github.com/morfious902002/impala-spark-jdbc-kerberos/blob/master/src/main/java/ImpalaSparkJDBC.java
>
> If you need to connect to Hive or Impala using JDBC with Kerberos
> authentication from Apache Spark, you can use it and will work.
>
> You can download the driver from Cloudera here:
> https://www.cloudera.com/downloads/connectors/hive/jdbc/2-6-1.html
>
>
>
> On Tue, Jul 7, 2020 at 12:03 AM Daniel de Oliveira Mantovani <
> daniel.oliveira.mantov...@gmail.com> wrote:
>
>> Hello Gabor,
>>
>> I meant, third-party connector* not "connection".
>>
>> Thank you so much!
>>
>> On Mon, Jul 6, 2020 at 1:09 PM Gabor Somogyi 
>> wrote:
>>
>>> Hi Daniel,
>>>
>>> I'm just working on the developer API where any custom JDBC connection
>>> provider(including Hive) can be added.
>>> Not sure what you mean by third-party connection but AFAIK there is no
>>> workaround at the moment.
>>>
>>> BR,
>>> G
>>>
>>>
>>> On Mon, Jul 6, 2020 at 12:09 PM Daniel de Oliveira Mantovani <
>>> daniel.oliveira.mantov...@gmail.com> wrote:
>>>
 Hello List,

 Is it possible to access Hive 2 through JDBC with Kerberos
 authentication from Apache Spark JDBC interface ? If it's possible do you
 have an example ?

 I found this tickets on JIRA:
 https://issues.apache.org/jira/browse/SPARK-12312
 https://issues.apache.org/jira/browse/SPARK-31815

 Do you know if there's a workaround for this ? Maybe using a
 third-party connection ?

 Thank you so much
 --

 --
 Daniel Mantovani


>>
>> --
>>
>> --
>> Daniel Mantovani
>>
>>
>
> --
>
> --
> Daniel Mantovani
>
>


Re: unsubscribe

2020-06-30 Thread Jeff Evans
That is not how you unsubscribe.  See here for instructions:
https://gist.github.com/jeff303/ba1906bb7bcb2f2501528a8bb1521b8e

On Tue, Jun 30, 2020 at 1:31 PM Bartłomiej Niemienionek <
b.niemienio...@gmail.com> wrote:

>


Re: File Not Found: /tmp/spark-events in Spark 3.0

2020-06-30 Thread Jeff Evans
This should only be needed if the spark.eventLog.enabled property was set
to true.  Is it possible the job configuration is different between your
two environments?

On Mon, Jun 29, 2020 at 9:21 AM ArtemisDev  wrote:

> While launching a spark job from Zeppelin against a standalone spark
> cluster (Spark 3.0 with multiple workers without hadoop), we have
> encountered a Spark interpreter exception caused by a I/O File Not Found
> exception due to the non-existence of the /tmp/spark-events directory.
> We had to create the /tmp/spark-events directory manually in order to
> resolve the problem.
>
> As a reference, the same notebook code run on Spark 2.4.6 (also a
> standalone cluster) without any problems.
>
> What is /tmp/spark-events for and is there anyway to pre-define this
> directory as some config parameter so we don't end up manually add it in
> /tmp?
>
> Thanks!
>
> -- ND
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: unsubscribe

2020-06-27 Thread Jeff Evans
That is not how you unsubscribe.  See here for instructions:
https://gist.github.com/jeff303/ba1906bb7bcb2f2501528a8bb1521b8e


On Sat, Jun 27, 2020, 6:08 PM Sri Kris  wrote:

>
>
>
>
> Sent from Mail  for
> Windows 10
>
>
>


Re: Where are all the jars gone ?

2020-06-24 Thread Jeff Evans
If I'm understanding this correctly, you are building Spark from source and
using the built artifacts (jars) in some other project.  Correct?  If so,
then why are you concerning yourself with the directory structure that
Spark, internally, uses when building its artifacts?  It should be a black
box to your application, entirely.  You would pick the profiles (ex: Scala
version, Hadoop version, etc.) you need, then the install phase of Maven
will take care of building the jars and putting them in your local Maven
repo.  After that, you can resolve them from your other project seamlessly
(simply by declaring the org/artifact/version).

Maven artifacts are immutable, at least released versions in Maven
central.  If "someone" (unclear who you are talking about) is "swapping
out" jars in a Maven repo then they're doing something extremely strange
and broken, unless they're simply replacing snapshot versions, which
is a different
beast entirely

.

On Wed, Jun 24, 2020 at 10:39 AM Anwar AliKhan 
wrote:

> THANKS
>
>
> It appears the directory containing the jars have been switched from
> download version to source version.
>
> In the download version it is just below parent directory called jars.
> level 1.
>
> In the git source version it is  4 levels down in the directory
>  /spark/assembly/target/scala-2.12/jars
>
> The issue I have with using maven is that the linking libraries can be
> changed at maven repository without my knowledge .
> So if an application compiled and worked previously could just break.
>
> It is not like when the developers make a change to the link libraries
> they run it by me first ,  they just upload it to maven repository with
> out asking me if their change
> Is going to impact my app.
>
>
>
>
>
>
> On Wed, 24 Jun 2020, 16:07 ArtemisDev,  wrote:
>
>> If you are using Maven to manage your jar dependencies, the jar files are
>> located in the maven repository on your home directory.  It is usually in
>> the .m2 directory.
>>
>> Hope this helps.
>>
>> -ND
>> On 6/23/20 3:21 PM, Anwar AliKhan wrote:
>>
>> Hi,
>>
>> I prefer to do most of my projects in Python and for that I use Jupyter.
>> I have been downloading the compiled version of spark.
>>
>> I do not normally like the source code version because the build process
>> makes me nervous.
>> You know with lines of stuff   scrolling up the screen.
>> What am I am going to do if a build fails. I am a user!
>>
>> I decided to risk it and it was only one  mvn command to build. (45
>> minutes later)
>> Everything is great. Success.
>>
>> I removed all jvms except jdk8 for compilation.
>>
>> I used jdk8 so I know which libraries where linked in the build process.
>> I also used my local version of maven. Not the apt install version .
>>
>> I used jdk8 because if you go this scala site.
>>
>> http://scala-ide.org/download/sdk.html. they say requirement  jdk8 for
>> IDE
>>  even for scala12.
>> They don't say JDK 8 or higher ,  just jdk8.
>>
>> So anyway  once in a while I  do spark projects in scala with eclipse.
>>
>> For that I don't use maven or anything. I prefer to make use of build path
>> And external jars. This way I know exactly which libraries I am linking
>> to.
>>
>> creating a jar in eclipse is straight forward for spark_submit.
>>
>>
>> Anyway  as you can see (below) I am pointing jupyter to find
>> spark.init('opt/spark').
>> That's OK everything is fine.
>>
>> With the compiled version of spark there is a jar directory which I have
>> been using in eclipse.
>>
>>
>>
>> With my own compiled from source version there is no jar directory.
>>
>>
>> Where are all the jars gone  ?.
>>
>>
>>
>> I am not sure how findspark.init('/opt/spark') is locating the libraries
>> unless it is finding them from
>> Anaconda.
>>
>>
>> import findspark
>> findspark.init('/opt/spark')
>> from pyspark.sql import SparkSession
>> spark = SparkSession \
>> .builder \
>> .appName('Titanic Data') \
>> .getOrCreate()
>>
>>


Re: apache-spark mongodb dataframe issue

2020-06-23 Thread Jeff Evans
As far as I know, in general, there isn't a way to distinguish explicit
null values from missing ones.  (Someone please correct me if I'm wrong,
since I would love to be able to do this for my own reasons).  If you
really must do it, and don't care about performance at all (since it will
be horrible), read each object as a separate batch, while inferring the
schema.  If the schema contains the column, but the value is null, you will
know it was explicitly set that way.  If the schema doesn't contain the
column, you'll know it was missing.

On Tue, Jun 23, 2020 at 7:34 AM Harmanat Singh 
wrote:

> Hi
>
> Please look at my issue from the link below.
>
> https://stackoverflow.com/questions/62526118/how-to-differentiate-between-null-and-missing-mongogdb-values-in-a-spark-datafra
>
>
> Kindly Help
>
>
> Best
> Mannat
>


Re: unsubscribe

2020-06-17 Thread Jeff Evans
That is not how you unsubscribe.  See here:
https://gist.github.com/jeff303/ba1906bb7bcb2f2501528a8bb1521b8e

On Wed, Jun 17, 2020 at 8:56 AM DIALLO Ibrahima (BPCE-IT - Consultime)
 wrote:

>
>
>
>
> *Ibrahima DIALLO*
>
> *Consultant Big Data – Architecte - Analyste*
>
> *Consultime * - *Pour BPCE-IT – Groupe BPCE*
>
> *D2I_FDT_DMA_BD2*
>
> *BPCE Infogérance & Technologies*
>
> 110 Avenue de France – 75013 PARIS -Tél. : +33185342104
>
> [image: BPCE ITx200.png]
>
>
>
>
> --
> L'intégrité de ce message n'étant pas assurée sur Internet, BPCE-IT ne
> peut être tenu responsable de son contenu. Si vous n'êtes pas destinataire
> de ce message, merci de le détruire et d'avertir l'expéditeur.
> The integrity of this message cannot be guaranteed on the Internet.
> BPCE-IT cannot therefore be considered responsible for the contents. If you
> are not the intended recipient of this message, then please delete it and
> notify the sender.
> --
>


Re: unsubscribe

2020-06-17 Thread Jeff Evans
That is not how you unsubscribe.  See here:
https://gist.github.com/jeff303/ba1906bb7bcb2f2501528a8bb1521b8e

On Wed, Jun 17, 2020 at 5:39 AM Ferguson, Jon
 wrote:

>
>
> This message is confidential and subject to terms at:
> https://www.jpmorgan.com/emaildisclaimer including on confidential,
> privileged or legal entity information, viruses and monitoring of
> electronic messages. If you are not the intended recipient, please delete
> this message and notify the sender immediately. Any unauthorized use is
> strictly prohibited.
>


Re: XPATH_INT behavior - XML - Function in Spark

2020-05-12 Thread Jeff Evans
It sounds like you're expecting the XPath expression to evaluate embedded
Spark SQL expressions?  From the documentation
, there
appears to be no reason to expect that to work.

On Tue, May 12, 2020 at 2:09 PM Chetan Khatri 
wrote:

> Can someone please help.. Thanks in advance.
>
> On Mon, May 11, 2020 at 5:29 PM Chetan Khatri 
> wrote:
>
>> Hi Spark Users,
>>
>> I want to parse xml coming in the query columns and get the value, I am
>> using *xpath_int* which works as per my requirement but When I am
>> embedding in the Spark SQL query columns it is failing.
>>
>> select timesheet_profile_id,
>> *xpath_int(timesheet_profile_code, '(/timesheetprofile/weeks/week[*
>> *td.current_week**]/**td.day**)[1]')*
>>
>> *this failed *
>> where Hardcoded values work for the above scenario
>>
>> scala> spark.sql("select timesheet_profile_id,
>> xpath_int(timesheet_profile_code,
>> '(/timesheetprofile/weeks/week[2]/friday)[1]') from
>> TIMESHEET_PROFILE_ATT").show(false)
>>
>> Anyone has worked on this? Thanks in advance.
>>
>> Thanks
>> - Chetan
>>
>>


Re: java.lang.OutOfMemoryError Spark Worker

2020-05-07 Thread Jeff Evans
You might want to double check your Hadoop config files.  From the stack
trace it looks like this is happening when simply trying to load
configuration (XML files).  Make sure they're well formed.

On Thu, May 7, 2020 at 6:12 AM Hrishikesh Mishra 
wrote:

> Hi
>
> I am getting out of memory error in worker log in streaming jobs in every
> couple of hours. After this worker dies. There is no shuffle, no
> aggression, no. caching  in job, its just a transformation.
> I'm not able to identify where is the problem, driver or executor. And why
> worker getting dead after the OOM streaming job should die. Am I missing
> something.
>
> Driver Memory:  2g
> Executor memory: 4g
>
> Spark Version:  2.4
> Kafka Direct Stream
> Spark Standalone Cluster.
>
>
> 20/05/06 12:52:20 INFO SecurityManager: SecurityManager: authentication
> disabled; ui acls disabled; users  with view permissions: Set(root); groups
> with view permissions: Set(); users  with modify permissions: Set(root);
> groups with modify permissions: Set()
>
> 20/05/06 12:53:03 ERROR SparkUncaughtExceptionHandler: Uncaught exception
> in thread Thread[ExecutorRunner for app-20200506124717-10226/0,5,main]
>
> java.lang.OutOfMemoryError: Java heap space
>
> at org.apache.xerces.util.XMLStringBuffer.append(Unknown Source)
>
> at org.apache.xerces.impl.XMLEntityScanner.scanData(Unknown Source)
>
> at org.apache.xerces.impl.XMLScanner.scanComment(Unknown Source)
>
> at
> org.apache.xerces.impl.XMLDocumentFragmentScannerImpl.scanComment(Unknown
> Source)
>
> at
> org.apache.xerces.impl.XMLDocumentFragmentScannerImpl$FragmentContentDispatcher.dispatch(Unknown
> Source)
>
> at
> org.apache.xerces.impl.XMLDocumentFragmentScannerImpl.scanDocument(Unknown
> Source)
>
> at org.apache.xerces.parsers.XML11Configuration.parse(Unknown Source)
>
> at org.apache.xerces.parsers.XML11Configuration.parse(Unknown Source)
>
> at org.apache.xerces.parsers.XMLParser.parse(Unknown Source)
>
> at org.apache.xerces.parsers.DOMParser.parse(Unknown Source)
>
> at org.apache.xerces.jaxp.DocumentBuilderImpl.parse(Unknown Source)
>
> at javax.xml.parsers.DocumentBuilder.parse(DocumentBuilder.java:150)
>
> at org.apache.hadoop.conf.Configuration.parse(Configuration.java:2480)
>
> at org.apache.hadoop.conf.Configuration.parse(Configuration.java:2468)
>
> at
> org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2539)
>
> at
> org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2492)
>
> at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2405)
>
> at org.apache.hadoop.conf.Configuration.set(Configuration.java:1143)
>
> at org.apache.hadoop.conf.Configuration.set(Configuration.java:1115)
>
> at
> org.apache.spark.deploy.SparkHadoopUtil$.org$apache$spark$deploy$SparkHadoopUtil$$appendS3AndSparkHadoopConfigurations(SparkHadoopUtil.scala:464)
>
> at
> org.apache.spark.deploy.SparkHadoopUtil$.newConfiguration(SparkHadoopUtil.scala:436)
>
> at
> org.apache.spark.deploy.SparkHadoopUtil.newConfiguration(SparkHadoopUtil.scala:114)
>
> at org.apache.spark.SecurityManager.(SecurityManager.scala:114)
>
> at org.apache.spark.deploy.worker.ExecutorRunner.org
> $apache$spark$deploy$worker$ExecutorRunner$$fetchAndRunExecutor(ExecutorRunner.scala:149)
>
> at
> org.apache.spark.deploy.worker.ExecutorRunner$$anon$1.run(ExecutorRunner.scala:73)
>
> 20/05/06 12:53:38 INFO DriverRunner: Worker shutting down, killing driver
> driver-20200505181719-1187
>
> 20/05/06 12:53:38 INFO DriverRunner: Killing driver process!
>
>
>
>
> Regards
> Hrishi
>


Re: [Spark SQL][Beginner] Spark throw Catalyst error while writing the dataframe in ORC format

2020-05-07 Thread Jeff Evans
You appear to be hitting the broadcast timeout.  See:
https://stackoverflow.com/a/41126034/375670

On Thu, May 7, 2020 at 8:56 AM Deepak Garg  wrote:

> Hi,
>
> I am getting following error while running a spark job. Error
> occurred when Spark is trying to write the dataframe in ORC format . I am
> pasting the error trace.
>
> Any help in resolving this would be appreciated.
>
> Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException:
> execute, tree:
>
> Exchange hashpartitioning(t_time#5, stg_name#842, access_ne#843,
> switch#844, 200)
> +- *(13) HashAggregate(keys=[t_time#5, stg_name#842, access_ne#843,
> switch#844], functions=[partial_count(1)], output=[t_time#5, stg_name#842,
> access_ne#843, switch#844, count#985L])
>+- Union
>
> at
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
> at
> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
> at
> org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
> at
> org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:150)
> at org.apache.spark.sql.execution.SortExec.inputRDDs(SortExec.scala:121)
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
> at
> org.apache.spark.sql.execution.joins.SortMergeJoinExec.doExecute(SortMergeJoinExec.scala:150)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
> at
> org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
> at
> org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
> at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:180)
> ... 28 more
>
> Caused by: java.util.concurrent.TimeoutException: Futures timed out after
> [300 seconds]
> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
> at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
> at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201)
> at
> org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:136)
> at
> org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:367)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:144)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:140)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
> at
> 

Re: Which SQL flavor does Spark SQL follow?

2020-05-06 Thread Jeff Evans
https://docs.databricks.com/spark/latest/spark-sql/language-manual/index.html

https://spark.apache.org/docs/latest/api/sql/index.html

On Wed, May 6, 2020 at 3:35 PM Aakash Basu 
wrote:

> Hi,
>
> Wish to know, which type of SQL syntax is followed when we write a plain
> SQL query inside spark.sql? Is it MySQL or PGSQL? I know it isn't SQL
> Server or Oracle as while migrating, had to convert a lot of SQL functions.
>
> Also if you can provide a documentation which clearly says the above would
> help.
>
> Thanks,
> AB
>
>


Re: [Meta] Moderation request diversion?

2020-04-24 Thread Jeff Evans
Thanks, Sean; much appreciated.

On Fri, Apr 24, 2020 at 1:09 PM Sean Owen  wrote:

> The mailing lists are operated by the ASF. I've asked whether it's
> possible here: https://issues.apache.org/jira/browse/INFRA-20186
>
> On Fri, Apr 24, 2020 at 12:39 PM Jeff Evans
>  wrote:
> >
> > Still noticing this problem quite a bit, both on the user and dev
> lists.  I notice that it appears to be using ezmlm as the software.  Is
> there any chance the list owner (someone at Databricks?) can take a look at
> restricting messages based on seeing the word "Unsubscribe" in the
> subject?  It appears to be possible; see:
> http://untroubled.org/ezmlm/faq/Restricting-posts-based-on-the-Subject-line.html#Restricting-posts-based-on-the-Subject-line
> >
> > On Mon, Jun 24, 2019 at 3:45 PM Jeff Evans <
> jeffrey.wayne.ev...@gmail.com> wrote:
> >>
> >> There seem to be a lot of people trying to unsubscribe via the main
> >> address, rather than following the instructions from the welcome
> >> email.  Of course, this is not all that surprising, but it leads to a
> >> lot of pointless threads*.  Is there a way to enable automatic
> >> detection and diversion of such requests?  I don't know what
> >> particular software is running this list, but mailman, for example,
> >> has such a capability.  I feel that such a thing would improve the
> >> signal-to-noise ratio of this valuable list.  Thanks.
> >>
> >> * see "unsubscribe" messages in the archive:
> >> http://apache-spark-user-list.1001560.n3.nabble.com/
>


Re: [Meta] Moderation request diversion?

2020-04-24 Thread Jeff Evans
Still noticing this problem quite a bit, both on the user and dev lists.  I
notice that it appears to be using ezmlm as the software.  Is there any
chance the list owner (someone at Databricks?) can take a look at
restricting messages based on seeing the word "Unsubscribe" in the
subject?  It appears to be possible; see:
http://untroubled.org/ezmlm/faq/Restricting-posts-based-on-the-Subject-line.html#Restricting-posts-based-on-the-Subject-line

On Mon, Jun 24, 2019 at 3:45 PM Jeff Evans 
wrote:

> There seem to be a lot of people trying to unsubscribe via the main
> address, rather than following the instructions from the welcome
> email.  Of course, this is not all that surprising, but it leads to a
> lot of pointless threads*.  Is there a way to enable automatic
> detection and diversion of such requests?  I don't know what
> particular software is running this list, but mailman, for example,
> has such a capability.  I feel that such a thing would improve the
> signal-to-noise ratio of this valuable list.  Thanks.
>
> * see "unsubscribe" messages in the archive:
> http://apache-spark-user-list.1001560.n3.nabble.com/
>


Re: SPARK Suitable IDE

2020-03-02 Thread Jeff Evans
For developing Spark itself, or applications built using Spark? In either
case, IntelliJ IDEA works well. For the former case, there is even a page
explaining how to set it up. https://spark.apache.org/developer-tools.html

On Mon, Mar 2, 2020, 4:43 PM Zahid Rahman  wrote:

> Hi,
>
> Can you recommend a suitable IDE for Apache sparks from the list below or
> if you know a more suitable one ?
>
> Codeanywhere
> goormIDE
> Koding
> SourceLair
> ShiftEdit
> Browxy
> repl.it
> PaizaCloud IDE
> Eclipse Che
> Visual Studio Online
> Gitpod
> Google Cloud Shell
> Codio
> Codepen
> CodeTasty
> Glitch
> JSitor
> ICEcoder
> Codiad
> Dirigible
> Orion
> Codiva.io
> Collide
> Codenvy
> AWS Cloud9
> JSFiddle
> GitLab
> SLAppForge Sigma
> Jupyter
> CoCalc
>
> Backbutton.co.uk
> ¯\_(ツ)_/¯
> ♡۶Java♡۶RMI ♡۶
> Make Use Method {MUM}
> makeuse.org
> 
>


Re: What options do I have to handle third party classes that are not serializable?

2020-02-25 Thread Jeff Evans
Did you try this?  https://stackoverflow.com/a/2114387/375670


On Tue, Feb 25, 2020 at 10:23 AM yeikel valdes  wrote:

> I am currently using a third party library(Lucene) with Spark that is not
> serializable. Due to that reason, it generates the following exception  :
>
> Job aborted due to stage failure: Task 144.0 in stage 25.0 (TID 2122) had a 
> not serializable result: org.apache.lucene.facet.FacetsConfig Serialization 
> stack: - object not serializable (class: 
> org.apache.lucene.facet.FacetsConfig, value: 
> org.apache.lucene.facet.FacetsConfg
>
> While it would be ideal if this class was serializable, there is really 
> nothing I can do to change this third party library in order to add 
> serialization to it.
>
> What options do I have, and what's the recommended option to handle this 
> problem?
>
> Thank you!
>
>
>


Re: Possible to limit number of IPC retries on spark-submit?

2020-01-31 Thread Jeff Evans
Figured out the answer, eventually.  The magic property name, in this case,
is yarn.client.failover-max-attempts (prefixed with spark.hadoop. in the
case of Spark, of course).  For a full explanation, see the StackOverflow
answer <https://stackoverflow.com/a/60011708/375670> I just added.

On Wed, Jan 22, 2020 at 5:02 PM Jeff Evans 
wrote:

> Greetings,
>
> Is it possible to limit the number of times the IPC client retries upon a
> spark-submit invocation?  For context, see this StackOverflow post
> <https://stackoverflow.com/questions/59863850/how-to-control-the-number-of-hadoop-ipc-retry-attempts-for-a-spark-job-submissio>.
> In essence, I am trying to call spark-submit on a Kerberized cluster,
> without having valid Kerberos tickets available.  This is deliberate, and
> I'm not truly facing a Kerberos issue.  Rather, this is the
> easiest reproducible case of "long IPC retry" I have been able to trigger.
>
> In this particular case, the following errors are printed (presumably by
> the launcher):
>
> 20/01/22 15:49:32 INFO retry.RetryInvocationHandler: java.io.IOException: 
> Failed on local exception: java.io.IOException: 
> org.apache.hadoop.security.AccessControlException: Client cannot authenticate 
> via:[TOKEN, KERBEROS]; Host Details : local host is: 
> "node-1.cluster/172.18.0.2"; destination host is: "node-1.cluster":8032; , 
> while invoking ApplicationClientProtocolPBClientImpl.getClusterMetrics over 
> null after 1 failover attempts. Trying to failover after sleeping for 35160ms.
>
> This continues for 30 times before the launcher finally gives up.
>
> As indicated in the answer on that StackOverflow post, the relevant Hadoop
> properties should be ipc.client.connect.max.retries and/or
> ipc.client.connect.max.retries.on.sasl.  However, in testing on Spark
> 2.4.0 (on CDH 6.1), I am not able to get either of these to take effect (it
> still retries 30 times regardless).  I am trying the SparkPi example, and
> specifying them with --conf spark.hadoop.ipc.client.connect.max.retries
> and/or --conf spark.hadoop.ipc.client.connect.max.retries.on.sasl.
>
> Any ideas on what I could be doing wrong, or why I can't get these
> properties to take effect?
>


Possible to limit number of IPC retries on spark-submit?

2020-01-22 Thread Jeff Evans
Greetings,

Is it possible to limit the number of times the IPC client retries upon a
spark-submit invocation?  For context, see this StackOverflow post
.
In essence, I am trying to call spark-submit on a Kerberized cluster,
without having valid Kerberos tickets available.  This is deliberate, and
I'm not truly facing a Kerberos issue.  Rather, this is the
easiest reproducible case of "long IPC retry" I have been able to trigger.

In this particular case, the following errors are printed (presumably by
the launcher):

20/01/22 15:49:32 INFO retry.RetryInvocationHandler:
java.io.IOException: Failed on local exception: java.io.IOException:
org.apache.hadoop.security.AccessControlException: Client cannot
authenticate via:[TOKEN, KERBEROS]; Host Details : local host is:
"node-1.cluster/172.18.0.2"; destination host is:
"node-1.cluster":8032; , while invoking
ApplicationClientProtocolPBClientImpl.getClusterMetrics over null
after 1 failover attempts. Trying to failover after sleeping for
35160ms.

This continues for 30 times before the launcher finally gives up.

As indicated in the answer on that StackOverflow post, the relevant Hadoop
properties should be ipc.client.connect.max.retries and/or
ipc.client.connect.max.retries.on.sasl.  However, in testing on Spark 2.4.0
(on CDH 6.1), I am not able to get either of these to take effect (it still
retries 30 times regardless).  I am trying the SparkPi example, and
specifying them with --conf spark.hadoop.ipc.client.connect.max.retries
and/or --conf spark.hadoop.ipc.client.connect.max.retries.on.sasl.

Any ideas on what I could be doing wrong, or why I can't get these
properties to take effect?


Re: Is there a way to get the final web URL from an active Spark context

2020-01-22 Thread Jeff Evans
To answer my own question, it turns out what I was after is the YARN
ResourceManager URL for the Spark application.  As alluded to in SPARK-20458
<https://issues.apache.org/jira/browse/SPARK-20458>, it's possible to use
the YARN API client to get this value.  Here is a gist that shows how it
can be done (given an instance of the Hadoop Configuration object):
https://gist.github.com/jeff303/8dab0e52dc227741b6605f576a317798


On Fri, Jan 17, 2020 at 4:09 PM Jeff Evans 
wrote:

> Given a session/context, we can get the UI web URL like this:
>
> sparkSession.sparkContext.uiWebUrl
>
> This gives me something like http://node-name.cluster-name:4040.  If
> opening this from outside the cluster (ex: my laptop), this redirects
> via HTTP 302 to something like
>
> http://node-name.cluster-name:8088/proxy/redirect/application_1579210019853_0023/
> .
> For discussion purposes, call the latter one the "final web URL".
> Critically, this final URL is active even after the application
> terminates.  The original uiWebUrl
> (http://node-name.cluster-name:4040) is not available after the
> application terminates, so one has to have captured the redirect in
> time, if they want to provide a persistent link to that history server
> UI entry (ex: for debugging purposes).
>
> Is there a way, other than using some HTTP client, to detect what this
> final URL will be directly from the SparkContext?
>


Is there a way to get the final web URL from an active Spark context

2020-01-17 Thread Jeff Evans
Given a session/context, we can get the UI web URL like this:

sparkSession.sparkContext.uiWebUrl

This gives me something like http://node-name.cluster-name:4040.  If
opening this from outside the cluster (ex: my laptop), this redirects
via HTTP 302 to something like
http://node-name.cluster-name:8088/proxy/redirect/application_1579210019853_0023/.
For discussion purposes, call the latter one the "final web URL".
Critically, this final URL is active even after the application
terminates.  The original uiWebUrl
(http://node-name.cluster-name:4040) is not available after the
application terminates, so one has to have captured the redirect in
time, if they want to provide a persistent link to that history server
UI entry (ex: for debugging purposes).

Is there a way, other than using some HTTP client, to detect what this
final URL will be directly from the SparkContext?

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



What's the deal with --proxy-user?

2019-11-06 Thread Jeff Evans
Hi all,

I'm trying to understand if the --proxy-user parameter to spark-submit is
deprecated, or something similar?  The reason I ask is because it's hard to
find documentation really talking about it.  The Spark Security doc
 doesn't mention it at
all, nor does the YARN doc
.  It does show
up in the source code in this README file
.
Is that supposed to be the definitive documentation for the option?

Looking into vendor specific docs, it disappears between CDH 5.9

and CDH 5.10

docs.  I couldn't find it anywhere in the HDP documentation, but I'm not as
familiar with navigating there.

And perhaps I'm reading a bit too much into the tea leaves here, but the
comments on SPARK-13478
 (which
removed the ability to specify --proxy-user along with --principal and
--keytab) seem to read like "you shouldn't be using it; you should just use
principal and keytab".

So, am I going crazy or is this really not clearly documented?  Is it a
deprecated feature or will it continue to be supported long term?  Any kind
of insight is appreciated.


Deleting columns within nested arrays/structs?

2019-10-29 Thread Jeff Evans
The starting point for the code is the various answer to this

StackOverflow question.  Fixing some of the issues there, I end up with the
following:

  def dropColumn(df: DataFrame, colName: String): DataFrame = {
df.schema.fields
.flatMap(f => {
  if (colName.startsWith(s"${f.name}.")) {
dropSubColumn(col(f.name), f.dataType, f.name, colName) match {
  case Some(x) => Some((f.name, x))
  case None => None
}
  } else {
None
  }
})
.foldLeft(df) {
  case (df, (colName, column)) => df.withColumn(colName, column)
}
  }

  def dropSubColumn(col: Column, colType: DataType, fullColName: String,
dropColName: String): Option[Column] = {
if (fullColName.equals(dropColName)) {
  None
} else if (dropColName.startsWith(s"$fullColName.")) {
  colType match {
case colType: StructType =>
  Some(struct(
colType.fields
.flatMap(f =>
  dropSubColumn(col.getField(f.name), f.dataType,
s"$fullColName.${f.name}", dropColName) match {
case Some(x) => Some(x.alias(f.name))
case None => None
  })
: _*))
case colType: ArrayType =>
  colType.elementType match {
case innerType: StructType =>
  Some(array(struct(innerType.fields
  .flatMap(f =>
dropSubColumn(col.getField(f.name), f.dataType,
s"$fullColName.${f.name}", dropColName) match {
  case Some(x) => Some(x.alias(f.name))
  case None => None
})
  : _*)))
  }

case _ => Some(col)
  }
} else {
  Some(col)
}
  }

Now, when I try this out on a simple nested JSON, it seems to work, in the
sense that non-removed column names still exist.  However, the type of the
"surviving" sibling field (i.e. the one not removed) has become wrapped in
an array type.  I have spent a while stepping through the code and can't
quite understand why this is happening.  Somehow, the GetArrayStructFields
class is involved.

// read some nested JSON with structs/arrays

val json = """{
  "foo": "bar",
  "top": {
"child1": 5,
"child2": [{
  "child2First": "one",
  "child2Second": 2
}]
  }
}""".stripMargin

val df = spark.read.option("multiLine", "true").json(Seq(json).toDS())

val resultDf = dropColumn(df, "top.child2.child2First")

resultDf.select("top.child2.child2Second")
/*
++
|child2Second|
++
|   [[2]]|
++
*/

// check the same from the original DataFrame

df.select("top.child2.child2Second")
/*
++
|child2Second|
++
| [2]|
++
*/

// check the field type for "child2Second"
resultDf.schema.fields(1).dataType.asInstanceOf[StructType].fields(1).dataType.asInstanceOf[ArrayType].elementType.asInstanceOf[StructType].fields(0).dataType.typeName
// prints array

// check the same from the original DataFrame (when it was index 1)
df.schema.fields(1).dataType.asInstanceOf[StructType].fields(1).dataType.asInstanceOf[ArrayType].elementType.asInstanceOf[StructType].fields(1).dataType.typeName
// prints long

Is the code above incorrect, with regards to dropping nested fields (in
this case, a field within a struct, which itself is in an array)?  Or is
there some other consideration I'm missing?  Any insight is appreciated.


Distinguishing between field missing and null in individual record?

2019-06-25 Thread Jeff Evans
Suppose we have the following JSON, which we parse into a DataFrame
(using the mulitline option).

[{
  "id": 8541,
  "value": "8541 changed again value"
},{
  "id": 51109,
  "name": "newest bob",
  "value": "51109 changed again"
}]

Regardless of whether we explicitly define a schema, or allow it to be
inferred, the result of df.show(), after parsing this data, is similar
to the following:

+-+--++
|   id|  name|   value|
+-+--++
| 8541|  null|8541 changed agai...|
|51109|newest bob| 51109 changed again|
+-+--++

Notice that the name column for the first row is null.  This JSON will
produce an identical DataFrame:

[{
  "id": 8541,
  "name": null,
  "value": "8541 changed again value"
},{
  "id": 51109,
  "name": "newest bob",
  "value": "51109 changed again"
}]

Is there a way to distinguish between these two cases in the DataFrame
(i.e. field is missing, but added as null due to inferred or explicit
schema, versus field is present but with null value)?

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[Meta] Moderation request diversion?

2019-06-24 Thread Jeff Evans
There seem to be a lot of people trying to unsubscribe via the main
address, rather than following the instructions from the welcome
email.  Of course, this is not all that surprising, but it leads to a
lot of pointless threads*.  Is there a way to enable automatic
detection and diversion of such requests?  I don't know what
particular software is running this list, but mailman, for example,
has such a capability.  I feel that such a thing would improve the
signal-to-noise ratio of this valuable list.  Thanks.

* see "unsubscribe" messages in the archive:
http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Is there a difference between --proxy-user or HADOOP_USER_NAME in a non-Kerberized YARN cluster?

2019-05-16 Thread Jeff Evans
Let's suppose we're dealing with a non-secured (i.e. not Kerberized)
YARN cluster.  When I invoke spark-submit, is there a practical
difference between specifying --proxy-user=foo (supposing
impersonation is properly set up) or setting the environment variable
HADOOP_USER_NAME=foo?  Thanks for any insights or links to docs.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Is it possible to obtain the full command to be invoked by SparkLauncher?

2019-04-24 Thread Jeff Evans
Thanks for the pointers.  We figured out the stdout/stderr capture
piece.  I was just looking to capture the full command in order to
help debug issues we run into with the submit depending on various
combinations of all parameters/classpath, and also to isolate job
specific issues from our wrapping application (i.e. being able to
submit the job directly, rather than through our app).  I will use the
environment variable method for now.

On Wed, Apr 24, 2019 at 4:18 PM Marcelo Vanzin  wrote:
>
> BTW the SparkLauncher API has hooks to capture the stderr of the
> spark-submit process into the logging system of the parent process.
> Check the API javadocs since it's been forever since I looked at that.
>
> On Wed, Apr 24, 2019 at 1:58 PM Marcelo Vanzin  wrote:
> >
> > Setting the SPARK_PRINT_LAUNCH_COMMAND env variable to 1 in the
> > launcher env will make Spark code print the command to stderr. Not
> > optimal but I think it's the only current option.
> >
> > On Wed, Apr 24, 2019 at 1:55 PM Jeff Evans
> >  wrote:
> > >
> > > The org.apache.spark.launcher.SparkLauncher is used to construct a
> > > spark-submit invocation programmatically, via a builder pattern.  In
> > > our application, which uses a SparkLauncher internally, I would like
> > > to log the full spark-submit command that it will invoke to our log
> > > file, in order to aid in debugging/support.  However, I can't figure
> > > out a way to do this.  This snippet would work, except for the fact
> > > that the createBuilder method is private.
> > >
> > > sparkLauncher.createBuilder().command()
> > >
> > > Is there an alternate way of doing this?  The Spark version is
> > > 2.11:2.4.0.  Thanks.
> > >
> > > -
> > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> > >
> >
> >
> > --
> > Marcelo
>
>
>
> --
> Marcelo

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Is it possible to obtain the full command to be invoked by SparkLauncher?

2019-04-24 Thread Jeff Evans
The org.apache.spark.launcher.SparkLauncher is used to construct a
spark-submit invocation programmatically, via a builder pattern.  In
our application, which uses a SparkLauncher internally, I would like
to log the full spark-submit command that it will invoke to our log
file, in order to aid in debugging/support.  However, I can't figure
out a way to do this.  This snippet would work, except for the fact
that the createBuilder method is private.

sparkLauncher.createBuilder().command()

Is there an alternate way of doing this?  The Spark version is
2.11:2.4.0.  Thanks.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Why does this spark-shell invocation get suspended due to tty output?

2019-04-04 Thread Jeff Evans
Hi all,

I am trying to make our application check the Spark version before
attempting to submit a job, to ensure the user is on a new enough
version (in our case, 2.3.0 or later).  I realize that there is a
--version argument to spark-shell, but that prints the version next to
some ASCII art so a bit of parsing would be needed.  So my initial
idea was to do something like the following:

echo 'System.out.println(sc.version)' | spark-shell 2>/dev/null | grep
-A2 'System.out.println' | grep -v 'System.out.println'

This works fine.  If you run in a shell (assuming spark-shell is on
your PATH), you get the version printed to the first line of stdout.
But I'm noticing some strange behavior when I try to invoke this from
either a Java or Scala application (via their respective
ProcessBuilder functionalities).  To be specific, if that Java/Scala
application is run as a background job, then when the spark-shell
invocation happens (after being forked by a ProcessBuilder), the job
goes into suspended state due to tty output.  This happens even if the
Java/Scala program has had its stdout and stderr redirected to files.

To demonstrate, compile this Java class, which is really just a simple
wrapper for ProcessBuilder.  It passes its main arguments directly to
ProcessBuilder, and creates threads to consume stdout/stderr (printing
them to its own stdout/stderr), then exits when that forked process
dies.

https://gist.github.com/jeff303/e5b44e220db20800752c932cbfbf7ed1

My environment is OS X 10.14.3, Spark 2.4.0 (installed via Homebrew),
Scala stable 2.12.8 (also Homebrew), and Oracle HotSpot JDK 1.8.0_181.
The behavior outlined below happens in both zsh 5.6.2 and bash
4.4.23(1).  Consider the following terminal session

# BEGIN SHELL SESSION
# compile the ProcessBuilderRunner class
javac ProcessBuilderRunner.java

# sanity check; just invoke an echo command
java ProcessBuilderRunner bash -c 'echo hello world'
About to run: bash -c echo hello world
stdout line: hello world
exit value from process: 0
stderr from process:
stdout from process: hello world

# try running the "version check" sequence outlined above in foreground
java ProcessBuilderRunner bash -c "echo
'System.out.println(sc.version)' | spark-shell 2>/dev/null | grep -A2
'System.out.println' | grep -v 'System.out.println'"
[Python:system] [11:08:54]
About to run: bash -c echo 'System.out.println(sc.version)' |
spark-shell 2>/dev/null | grep -A2 'System.out.println' | grep -v
'System.out.println'
stdout line: 2.4.0
stdout line:
exit value from process: 0
stderr from process:
stdout from process: 2.4.0

# run the same thing, but in the background, redirecting outputs to files
java ProcessBuilderRunner bash -c "echo
'System.out.println(sc.version)' | spark-shell 2>/dev/null | grep -A2
'System.out.println' | grep -v 'System.out.println'"
>/tmp/spark-check.out 2>/tmp/spark-check.err &

# after a few seconds, the job is suspended due to tty output
[1]  + 8964 suspended (tty output)  java ProcessBuilderRunner bash -c
> /tmp/spark-check.out 2>

# foreground the job; it will complete shortly thereafter
fg

# confirm the stdout is correct
cat /tmp/spark-check.out
About to run: bash -c echo 'System.out.println(sc.version)' |
spark-shell 2>/dev/null | grep -A2 'System.out.println' | grep -v
'System.out.println'
stdout line: 2.4.0
stdout line:
exit value from process: 0
stdout from process: 2.4.0
# END SHELL SESSION

Why is the backgrounded Java process getting suspended when it tries
to invoke spark-shell here?  Theoretically, all of the programs
involved here should have a well defined sink for their stdout, and in
the foreground everything works correctly.  Also, of note, the same
exact thing run from a Scala class* results in the same behavior.  I
am not that knowledgeable on the finer points of tty handling, so
hopefully someone can point me in the right direction.  Thanks.

* Scala version:
https://gist.github.com/jeff303/2c2c3daa49a9cb588a0de6f1a73255b2

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org