Re: Spark 3.0 with Hadoop 2.6 HDFS/Hive

2020-07-19 Thread Ashika Umanga Umagiliya
Hello

"spark.yarn.populateHadoopClasspath" is used in YARN mode correct?
However our Spark cluster is standalone cluster not using YARN.
We only connect to HDFS/Hive to access data.Computation is done on our
spark cluster running on K8s (not Yarn)


On Mon, Jul 20, 2020 at 2:04 PM DB Tsai  wrote:

> In Spark 3.0, if you use the `with-hadoop` Spark distribution that has
> embedded Hadoop 3.2, you can set
> `spark.yarn.populateHadoopClasspath=false` to not populate the
> cluster's hadoop classpath. In this scenario, Spark will use hadoop
> 3.2 client to connect to hadoop 2.6 which should work fine. In fact,
> we have production deployment using this way for a while.
>
> On Sun, Jul 19, 2020 at 8:10 PM Ashika Umanga 
> wrote:
> >
> > Greetings,
> >
> > Hadoop 2.6 has been removed according to this ticket
> https://issues.apache.org/jira/browse/SPARK-25016
> >
> > We run our Spark cluster on K8s in standalone mode.
> > We access HDFS/Hive running on a Hadoop 2.6 cluster.
> > We've been using Spark 2.4.5 and planning on upgrading to Spark 3.0.0
> > However, we dont have any control over the Hadoop cluster and it will
> remain in 2.6
> >
> > Is Spark 3.0 still compatible with HDFS/Hive running on Hadoop 2.6 ?
> >
> > Best Regards,
>
>
>
> --
> Sincerely,
>
> DB Tsai
> --
> Web: https://www.dbtsai.com
> PGP Key ID: 42E5B25A8F7A82C1
>


-- 
Umanga
http://jp.linkedin.com/in/umanga
http://umanga.ifreepages.com


Re: Spark 3.0 with Hadoop 2.6 HDFS/Hive

2020-07-19 Thread Prashant Sharma
Hi Ashika,

Hadoop 2.6 is now no longer supported, and since it has not been maintained
in the last 2 years, it means it may have some security issues unpatched.
Spark 3.0 onwards, we no longer support it, in other words, we have
modified our codebase in a way that Hadoop 2.6 won't work. However, if you
are determined, you can always apply a custom patch to spark codebase and
support it. I would recommend moving to newer Hadoop.

Thanks,

On Mon, Jul 20, 2020 at 8:41 AM Ashika Umanga 
wrote:

> Greetings,
>
> Hadoop 2.6 has been removed according to this ticket
> https://issues.apache.org/jira/browse/SPARK-25016
>
> We run our Spark cluster on K8s in standalone mode.
> We access HDFS/Hive running on a Hadoop 2.6 cluster.
> We've been using Spark 2.4.5 and planning on upgrading to Spark 3.0.0
> However, we dont have any control over the Hadoop cluster and it will
> remain in 2.6
>
> Is Spark 3.0 still compatible with HDFS/Hive running on Hadoop 2.6 ?
>
> Best Regards,
>


Re: Spark UI

2020-07-19 Thread Piyush Acharya
https://www.youtube.com/watch?v=YgQgJceojJY  (Xiao's video )





On Mon, Jul 20, 2020 at 8:03 AM Xiao Li  wrote:

> https://spark.apache.org/docs/3.0.0/web-ui.html is the official doc
> for Spark UI.
>
> Xiao
>
> On Sun, Jul 19, 2020 at 1:38 PM venkatadevarapu 
> wrote:
>
>> Hi,
>>
>> I'm looking for a tutorial/video/material which explains the content of
>> various tabes in SPARK WEB UI.
>> Can some one direct me with the relevant info.
>>
>> Thanks
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>
> --
> 
>


Spark 3.0 with Hadoop 2.6 HDFS/Hive

2020-07-19 Thread Ashika Umanga
Greetings,

Hadoop 2.6 has been removed according to this ticket
https://issues.apache.org/jira/browse/SPARK-25016

We run our Spark cluster on K8s in standalone mode.
We access HDFS/Hive running on a Hadoop 2.6 cluster.
We've been using Spark 2.4.5 and planning on upgrading to Spark 3.0.0
However, we dont have any control over the Hadoop cluster and it will
remain in 2.6

Is Spark 3.0 still compatible with HDFS/Hive running on Hadoop 2.6 ?

Best Regards,


Re: Spark UI

2020-07-19 Thread Xiao Li
https://spark.apache.org/docs/3.0.0/web-ui.html is the official doc
for Spark UI.

Xiao

On Sun, Jul 19, 2020 at 1:38 PM venkatadevarapu 
wrote:

> Hi,
>
> I'm looking for a tutorial/video/material which explains the content of
> various tabes in SPARK WEB UI.
> Can some one direct me with the relevant info.
>
> Thanks
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 



Re: Overwrite Mode not Working Correctly in spark 3.0.0

2020-07-19 Thread anbutech
Hi,

When im using option 1,it is completely overwrite the whole table.this is
not expected here.im running for multiple tables with different hours.

When im using option 2,im getting the following error

Predicate references non-partition column 'json_feeds_flatten_data'. Only
the partition columns may be referenced: [table_name, y, m, d, h];

Thanks



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark UI

2020-07-19 Thread venkatadevarapu
Hi,

I'm looking for a tutorial/video/material which explains the content of
various tabes in SPARK WEB UI.
Can some one direct me with the relevant info.

Thanks



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Schedule/Orchestrate spark structured streaming job

2020-07-19 Thread Piyush Acharya
Some of the options of workflows
https://medium.com/@xunnan.xu/workflow-processing-engine-overview-2018-airflow-vs-azkaban-vs-conductor-vs-oozie-vs-amazon-step-90affc54d53b

Streaming is a kind of infinitely running job, so, you just have to trigger
it only once unless you re not using it with Trigger = Once.

Regards,
..Piyush


On Sun, Jul 19, 2020 at 11:01 PM anbutech  wrote:

> Hi Team,
>
> I'm very new to spark structured streaming.could you please guide me how to
> Schedule/Orchestrate spark structured streaming job.Any scheduler similar
> like airflow.I knew airflow doesn't support streaming jobs.
>
> Thanks
> Anbu
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Overwrite Mode not Working Correctly in spark 3.0.0

2020-07-19 Thread Piyush Acharya
Can you please send the error message? it would ve very helpful to get to
the root cause.

On Sun, Jul 19, 2020 at 10:57 PM anbutech  wrote:

> Hi Team,
>
> I'm facing weird behavior in the pyspark dataframe(databricks delta spark
> 3.0.0 supported)
>
> I have tried the below two options to write the processed datafame data
> into
> delta table with respect to the partition columns in the table.Actually
> overwrite mode completely overwrite the whole table.i couldn't figure it
> out
> why did the dataframe fully overwrite here.
>
> Also i'm getting the following error while testing with below option 2
>
>
> Predicate references non-partition column 'json_feeds_flatten_data'. Only
> the partition columns may be referenced: [table_name, y, m, d, h];
>
> could you please me why did the pyspark behavior like this?.It would be
> very
> helpful to know the mistake here.
>
> sample partition column values:
> ---
>
> table_name='json_feeds_flatten_data'
> y=2020
> m=7
> d=19
> h=0
>
> Option 1:
>
> partition_keys=['table_name','y','m','d','h']
>
>  (final_df
>   .withColumn('y', lit(y).cast('int'))
>.withColumn('m', lit(m).cast('int'))
>.withColumn('d', lit(d).cast('int'))
>.withColumn('h', lit(h).cast('int'))
>.write
>.partitionBy(partition_keys)
>.format("delta")
>.mode('overwrite')
>.saveAsTable(target_table)
>  )
>
> Option 2:
>
> rep_wh = 'table_name={} AND y={} AND m={} AND d={} AND
> h={}'.format(table_name,y, m, d, h)
> (final_df
>   .withColumn('y', lit(y).cast('int'))
>   .withColumn('m', lit(m).cast('int'))
>   .withColumn('d', lit(d).cast('int'))
>   .withColumn('h', lit(h).cast('int'))
>   .write
>   .format("delta")
>   .mode('overwrite')
>   .option('replaceWhere', rep_wh )
>   .saveAsTable(target_table)
> )
>
> Thanks
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: OOM while processing read/write to S3 using Spark Structured Streaming

2020-07-19 Thread Piyush Acharya
Please try with maxBytesPerTrigger option, probably files are big enough to
crash the JVM.
Please give some info on Executors and file info ( size etc)

Regards,
..Piyush

On Sun, Jul 19, 2020 at 3:29 PM Rachana Srivastava
 wrote:

> *Issue:* I am trying to process 5000+ files of gzipped json file
> periodically from S3 using Structured Streaming code.
>
> *Here are the key steps:*
>
>1.
>
>Read json schema and broadccast to executors
>2.
>
>Read Stream
>
>Dataset inputDS = sparkSession.readStream() .format("text")
>.option("inferSchema", "true") .option("header", "true")
>.option("multiLine", true).schema(jsonSchema) .option("mode", "PERMISSIVE")
>.json(inputPath + "/*");
>3.
>
>Process each file in a map Dataset ds = inputDS.map(x -> { ... },
>Encoders.STRING());
>4.
>
>Write output to S3
>
>StreamingQuery query = ds .coalesce(1) .writeStream()
>.outputMode("append") .format("csv") ... .start();
>
> *maxFilesPerTrigger* is set to 500 so I was hoping the streaming will
> pick only that many file to process. Why are we getting OOM? If in a we
> have more than 3500 files then system crashes with OOM.
>
>


Schedule/Orchestrate spark structured streaming job

2020-07-19 Thread anbutech
Hi Team,

I'm very new to spark structured streaming.could you please guide me how to
Schedule/Orchestrate spark structured streaming job.Any scheduler similar
like airflow.I knew airflow doesn't support streaming jobs.

Thanks
Anbu



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Overwrite Mode not Working Correctly in spark 3.0.0

2020-07-19 Thread anbutech
Hi Team,

I'm facing weird behavior in the pyspark dataframe(databricks delta spark
3.0.0 supported)

I have tried the below two options to write the processed datafame data into
delta table with respect to the partition columns in the table.Actually
overwrite mode completely overwrite the whole table.i couldn't figure it out
why did the dataframe fully overwrite here.

Also i'm getting the following error while testing with below option 2


Predicate references non-partition column 'json_feeds_flatten_data'. Only
the partition columns may be referenced: [table_name, y, m, d, h];

could you please me why did the pyspark behavior like this?.It would be very
helpful to know the mistake here.

sample partition column values:
---

table_name='json_feeds_flatten_data'
y=2020
m=7
d=19
h=0

Option 1:

partition_keys=['table_name','y','m','d','h']

 (final_df
  .withColumn('y', lit(y).cast('int'))
   .withColumn('m', lit(m).cast('int'))
   .withColumn('d', lit(d).cast('int'))
   .withColumn('h', lit(h).cast('int'))
   .write
   .partitionBy(partition_keys)
   .format("delta")
   .mode('overwrite')
   .saveAsTable(target_table)
 )

Option 2:

rep_wh = 'table_name={} AND y={} AND m={} AND d={} AND
h={}'.format(table_name,y, m, d, h) 
(final_df
  .withColumn('y', lit(y).cast('int'))
  .withColumn('m', lit(m).cast('int'))
  .withColumn('d', lit(d).cast('int'))
  .withColumn('h', lit(h).cast('int'))
  .write
  .format("delta")
  .mode('overwrite')
  .option('replaceWhere', rep_wh )
  .saveAsTable(target_table)
)

Thanks



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: OOM while processing read/write to S3 using Spark Structured Streaming

2020-07-19 Thread Sanjeev Mishra
Can you reduce maxFilesPerTrigger further and see if the OOM still persists, if 
it does then the problem may be somewhere else.

> On Jul 19, 2020, at 5:37 AM, Jungtaek Lim  
> wrote:
> 
> Please provide logs and dump file for the OOM case - otherwise no one could 
> say what's the cause.
> 
> Add JVM options to driver/executor => -XX:+HeapDumpOnOutOfMemoryError 
> -XX:HeapDumpPath="...dir..."
> 
> On Sun, Jul 19, 2020 at 6:56 PM Rachana Srivastava 
>  wrote:
> Issue: I am trying to process 5000+ files of gzipped json file periodically 
> from S3 using Structured Streaming code. 
> 
> Here are the key steps:
> Read json schema and broadccast to executors
> Read Stream
> 
> Dataset inputDS = sparkSession.readStream() .format("text") 
> .option("inferSchema", "true") .option("header", "true") .option("multiLine", 
> true).schema(jsonSchema) .option("mode", "PERMISSIVE") .json(inputPath + 
> "/*");
> Process each file in a map Dataset ds = inputDS.map(x -> { ... }, 
> Encoders.STRING());
> Write output to S3
> 
> StreamingQuery query = ds .coalesce(1) .writeStream() .outputMode("append") 
> .format("csv") ... .start();
> maxFilesPerTrigger is set to 500 so I was hoping the streaming will pick only 
> that many file to process. Why are we getting OOM? If in a we have more than 
> 3500 files then system crashes with OOM.
> 
> 



Re: OOM while processing read/write to S3 using Spark Structured Streaming

2020-07-19 Thread Jungtaek Lim
Please provide logs and dump file for the OOM case - otherwise no one could
say what's the cause.

Add JVM options to driver/executor => -XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath="...dir..."

On Sun, Jul 19, 2020 at 6:56 PM Rachana Srivastava
 wrote:

> *Issue:* I am trying to process 5000+ files of gzipped json file
> periodically from S3 using Structured Streaming code.
>
> *Here are the key steps:*
>
>1.
>
>Read json schema and broadccast to executors
>2.
>
>Read Stream
>
>Dataset inputDS = sparkSession.readStream() .format("text")
>.option("inferSchema", "true") .option("header", "true")
>.option("multiLine", true).schema(jsonSchema) .option("mode", "PERMISSIVE")
>.json(inputPath + "/*");
>3.
>
>Process each file in a map Dataset ds = inputDS.map(x -> { ... },
>Encoders.STRING());
>4.
>
>Write output to S3
>
>StreamingQuery query = ds .coalesce(1) .writeStream()
>.outputMode("append") .format("csv") ... .start();
>
> *maxFilesPerTrigger* is set to 500 so I was hoping the streaming will
> pick only that many file to process. Why are we getting OOM? If in a we
> have more than 3500 files then system crashes with OOM.
>
>


OOM while processing read/write to S3 using Spark Structured Streaming

2020-07-19 Thread Rachana Srivastava
Issue: I am trying to process 5000+ files of gzipped json file periodically 
from S3 using Structured Streaming code. 
Here are the key steps:   
   -
Read json schema and broadccast to executors

   -
Read Stream
   
Dataset inputDS = sparkSession.readStream() .format("text") 
.option("inferSchema", "true") .option("header", "true") .option("multiLine", 
true).schema(jsonSchema) .option("mode", "PERMISSIVE") .json(inputPath + "/*");

   -
Process each file in a map Dataset ds = inputDS.map(x -> { ... }, 
Encoders.STRING());

   -
Write output to S3
   
StreamingQuery query = ds .coalesce(1) .writeStream() .outputMode("append") 
.format("csv") ... .start();


maxFilesPerTrigger is set to 500 so I was hoping the streaming will pick only 
that many file to process. Why are we getting OOM? If in a we have more than 
3500 files then system crashes with OOM.