Re: Do spark-submit overwrite the Spark session created manually?

2018-12-31 Thread Neo Chien
Hi

According to official spec as below, I think the SparkSession.builder gets
the highest priority to the configuration and it means the ‘spark-submit’
passing options would be ignored. Please correct me if I am wrong, many
thanks.

Properties set directly on the SparkConf take highest precedence, then
flags passed to spark-submit or spark-shell, then options in the
spark-defaults.conf file.

https://spark.apache.org/docs/latest/configuration.html

Hope this helps,
Neo


 於 2019年1月1日 週二 上午6:04寫道:

> Hi Community ,
>
>
>
> When we submit a job using ‘spark-submit’ passing options like the ‘master
> url’ what should be the content of the main class?
>
>
>
> For example , if I create the session myself :
>
>
>
> val spark = SparkSession.builder.
>
>   master("local[*]")
>
>   .appName("Console")
>
>   .config("spark.app.id", "spark-es")
>
>   .getOrCreate()
>
>
>
> Will spark-submit overwrite parameters like the master url , name , etc?
>
>
>
> Thank you
>



[image: Mailtrack]

Sender
notified by
Mailtrack

19/01/01
下午3:43:08


Re: Corrupt record handling in spark structured streaming and from_json function

2018-12-31 Thread Colin Williams
Dear spark user community,

I have recieved some insight regarding filtering seperate dataframes
in my spark-structured-streaming job. However I wish to write the
dataframes aforementioned above in the stack overflow question each
using a parquet writer to a separate location. My initial impression
is this requires multiple sinks, but I'm being pressured against that.
I think it might also be possible using the for each / for each batch
writers. But I'm not sure regarding parquet writer, and also the
caveats to this approach. Can some more advanced users or developers
suggest how to go about this, particularly without using multiple
streams?


On Wed, Dec 26, 2018 at 6:01 PM Colin Williams
 wrote:
>
> https://stackoverflow.com/questions/53938967/writing-corrupt-data-from-kafka-json-datasource-in-spark-structured-streaming
>
> On Wed, Dec 26, 2018 at 2:42 PM Colin Williams
>  wrote:
> >
> > From my initial impression it looks like I'd need to create my own
> > `from_json` using `jsonToStructs` as a reference but try to handle `
> > case : BadRecordException => null ` or similar to try to write the non
> > matching string to a corrupt records column
> >
> > On Wed, Dec 26, 2018 at 1:55 PM Colin Williams
> >  wrote:
> > >
> > > Hi,
> > >
> > > I'm trying to figure out how I can write records that don't match a
> > > json read schema via spark structred streaming to an output sink /
> > > parquet location. Previously I did this in batch via corrupt column
> > > features of batch. But in this spark structured streaming I'm reading
> > > from kafka a string and using from_json on the value of that string.
> > > If it doesn't match my schema then I from_json returns null for all
> > > the rows, and does not populate a corrupt record column. But I want to
> > > somehow obtain the source kafka string in a dataframe, and an write to
> > > a output sink / parquet location.
> > >
> > > def getKafkaEventDataFrame(rawKafkaDataFrame: DataFrame, schema: 
> > > StructType) = {
> > >   val jsonDataFrame = 
> > > rawKafkaDataFrame.select(col("value").cast("string"))
> > >   jsonDataFrame.select(from_json(col("value"),
> > > schema)).select("jsontostructs(value).*")
> > > }

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



structure streaming dataframe/dataset join (Java)

2018-12-31 Thread Mann Du
Hello there,

I am trying to calculate simple difference btw adjacent rows ( ts = ts -10)
of a column for a dataset using Join (of itself). The sql expression was
working for static datasets (trackT) as:

Dataset trackDiff = spark.sql(" select a.*, "

+ "a.posX - coalesce(b.posX, 0) as delX,

+ "from trackT a left join trackT b "

+ "on a.ts = b.ts - 10 ");



However, if the dataset is a structure streaming dataset, Spark prompts
that "Stream-stream outer join between two streaming

DataFrame/Datasets is not supported without a watermark in the join keys".
Since the dataset joins itself, I was thinking to just use an arbitrary
time interval as the watermark to create two streaming datasets and join
them:


Dataset trackWM1 = trackT.withColumn("ts1", trackT.col("timestamp"))

.drop("timestamp").withWatermark("ts1", "10 second");

Dataset trackWM2 = trackT.withColumn("ts2", trackT.col("timestamp"))

 .drop("timestamp").withWatermark("ts2", "10 second");


Dataset joinDF = trackWM1.join(trackWM2, "???")


I am stuck and don't know how to do what I intended to do for the static
datasets for this streaming dataset. The join seems to me mean different
when I added the time interval watermark, as the original one was joining
tables with different rows. Can someone explain how I can realize the
original logic in streaming dataset. Probably I don't even need a join?


Thanks.


Best,


Mann


Do spark-submit overwrite the Spark session created manually?

2018-12-31 Thread email
Hi Community , 

 

When we submit a job using 'spark-submit' passing options like the 'master
url' what should be the content of the main class? 

 

For example , if I create the session myself : 

 

val spark = SparkSession.builder.

  master("local[*]")

  .appName("Console")

  .config("spark.app.id", "spark-es")

  .getOrCreate()

 

Will spark-submit overwrite parameters like the master url , name , etc?  

 

Thank you



Spark jdbc postgres numeric array

2018-12-31 Thread Alexey
Hi,

I came across strange behavior when dealing with postgres columns of type 
numeric[] using Spark 2.3.2, PostgreSQL 10.4, 9.6.9.
Consider the following table definition:

create table test1
(
   v  numeric[],
   d  numeric
);

insert into test1 values('{.222,.332}', 222.4555);

When reading the table into a Dataframe, I get the following schema:

root
 |-- v: array (nullable = true)
 ||-- element: decimal(0,0) (containsNull = true)
 |-- d: decimal(38,18) (nullable = true)

Notice that for both columns precision and scale were not specified, but in 
case of the array element I got both set to 0, while in the other case defaults 
were set.

Later, when I try to read the Dataframe, I get the following error:

java.lang.IllegalArgumentException: requirement failed: Decimal precision 4 
exceeds max precision 0
at scala.Predef$.require(Predef.scala:224)
at org.apache.spark.sql.types.Decimal.set(Decimal.scala:114)
at org.apache.spark.sql.types.Decimal$.apply(Decimal.scala:453)
at 
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$16$$anonfun$apply$6$$anonfun$apply$7.apply(JdbcUtils.scala:474)
...

I would expect to get array elements of type decimal(38,18) and no error when 
reading in this case.
Should this be considered a bug? Is there a workaround other than changing the 
column array type definition to include explicit precision and scale?

Best regards,
Alexey

-- реклама ---
Поторопись зарегистрировать самый короткий почтовый адрес @i.ua
https://mail.i.ua/reg - и получи 1Gb для хранения писем

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Postgres Read JDBC with COPY TO STDOUT

2018-12-31 Thread Nicolas Paris
The resulting library is on github: https://github.com/EDS-APHP/spark-postgres
While there is room for improvements it is also able to read/write postgres
data with the COPY statement allowing reading/writing **very large** tables
without problems.


On Sat, Dec 29, 2018 at 01:06:00PM +0100, Nicolas Paris wrote:
> Hi
> 
> The spark postgres JDBC reader is limited because it relies on basic
> SELECT statements with fetchsize and crashes on large tables even if
> multiple partitions are setup with lower/upper bounds.
> 
> I am about writing a new postgres JDBC reader based on "COPY TO STDOUT".
> It would stream the data and produce CSV on the fileSystem (hdfs or
> local).  The CSV would be then parsed with the spark CSV reader to
> produce a dataframe. It would send multiple "COPY TO STDOUT" for each
> executor.
> 
> Right now, I am able to loop over an output stream and write the string
> somewhere.
> I am wondering what would be the best way to process the resulting
> string stream. In particular the best way to direct it to a hdfs folder
> or maybe parse it on the fly into a dataframe.
> 
> Thanks,
> 
> -- 
> nicolas
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

-- 
nicolas

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org