Spark Structured Streaming keep on consuming usercache

2020-07-20 Thread Yong Yuan
It seems the following structured streaming code keeps on consuming
usercache until all disk space are occupied.

val monitoring_stream =
monitoring_df.writeStream
.trigger(Trigger.ProcessingTime("120  seconds"))
.foreachBatch {
(batchDF: DataFrame, batchId: Long) =>
if(!batchDF.isEmpty)   batchDF.show()
}


I even did not call batchDF.persist(). Do I need to really save/write
batchDF to somewhere to release the usercache?

I also tried to call spark.catalog.clearCache() explicitly in a loop, which
does not help solve this problem either.

Below figure also shows the capacity of the cluster is decreasing with the
running of these codes.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Spark ETL use case

2020-07-20 Thread codingkapoor
Can we use spark as a ETL service? Suppose we have data written to our
cassandra data stores and we need to transform and load the same to vertica
for analytics purposes. Since spark is already a very well designed
distributed system, wouldn't it make sense to load data from cass into spark
datasets and then push the same after transformations to vertica? This way
we won't need to implement the ETL service ourselves and make use of already
performant system in place. 

Please advice. TIA.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark Deployment Strategy

2020-07-20 Thread codingkapoor
I want to understand how best to deploy spark close to a data source or sink.

Let's say, I have a vertica cluster that I need to run spark job on. In that
case how should spark cluster be setup? 

1. Should we run a spark worker node on each vertica cluster node? 
2. How about when shuffling plays out?
3. Also how would the deployment look like in a managed cluster deployement
such as kubernetes?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: schema changes of custom data source in persistent tables DataSourceV1

2020-07-20 Thread fansparker
Does anybody know if there is a way to get the persisted table's schema
updated when the underlying custom data source schema is changed? Currently,
we have to drop and re-create the table. 



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark 3.0 with Hadoop 2.6 HDFS/Hive

2020-07-20 Thread DB Tsai
In Spark 3.0, if you use the `with-hadoop` Spark distribution that has
embedded Hadoop 3.2, you can set
`spark.yarn.populateHadoopClasspath=false` to not populate the
cluster's hadoop classpath. In this scenario, Spark will use hadoop
3.2 client to connect to hadoop 2.6 which should work fine. In fact,
we have production deployment using this way for a while.

On Sun, Jul 19, 2020 at 8:10 PM Ashika Umanga  wrote:
>
> Greetings,
>
> Hadoop 2.6 has been removed according to this ticket 
> https://issues.apache.org/jira/browse/SPARK-25016
>
> We run our Spark cluster on K8s in standalone mode.
> We access HDFS/Hive running on a Hadoop 2.6 cluster.
> We've been using Spark 2.4.5 and planning on upgrading to Spark 3.0.0
> However, we dont have any control over the Hadoop cluster and it will remain 
> in 2.6
>
> Is Spark 3.0 still compatible with HDFS/Hive running on Hadoop 2.6 ?
>
> Best Regards,



-- 
Sincerely,

DB Tsai
--
Web: https://www.dbtsai.com
PGP Key ID: 42E5B25A8F7A82C1

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark Streaming - Set Parallelism and Optimize driver

2020-07-20 Thread forece85
I am new to spark streaming and trying to understand spark ui and to do
optimizations.

1. Processing at executors took less time than at driver. How to optimize to
make driver tasks fast ?
2. We are using dstream.repartition(defaultParallelism*3) to increase
parallelism which is causing high shuffles. Is there any option to avoid
repartition manually to reduce data shuffles.
3. Also trying to understand how 6 tasks in stage1 and 199 tasks in stage2
got created?

*hardware configuration:* executor-cores: 3; driver-cores: 3;
dynamicAllocation is true; 
initial,min,maxExecutors: 25

StackOverFlow link for screenshots:
https://stackoverflow.com/questions/62993030/spark-dstream-help-needed-to-understand-ui-and-how-to-set-parallelism-or-defau

Thanks in Advance



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Using pyspark with Spark 2.4.3 a MultiLayerPerceptron model givens inconsistent outputs if a large amount of data is fed into it and at least one of the model outputs is fed to a Python UDF.

2020-07-20 Thread Ben Smith
Thanks for that. I have played with this a bit more after your feedback and
found:

I can only recreate the problem with python 3.6+. If I change between python
2.7, python 3.6 and python 3.7 I find that the problem occurs in the python
3.6 and 3.7 case but not in the python 2.7.
- I have used minimal python virtual environments with the same dependencies
between python 2.7 and python 3.x (basically nothing installed except
numpy), so I don't think it's a python dependency version issue
- I have compared the DAG's and execution plans generated by Spark and they
look the same between the working and broken cases. I don't think the python
version is impact Sparks execution plan

Note that in the python3.6+ case I still can't recreate the problem every
time, but it does seem to happen the majority of the times I try.

I also tested with Spark 2.4.6 and still get the problem. I cannot try with
3.0.0 as that hits a fatal exception due to defect SPARK-32232

The workaround you suggest isn't going to work in my case as the code sample
I provide is a simplified version of what I'm actually doing in python.
However I think I have a workaround where I force a cache/persist of the
data after the model has transformed the features as I cannot recreate the
issue if the python UDF is run on the cached data in a separate action.

I will add another message if I find any more info



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Structured Streaming keep on consuming usercache

2020-07-20 Thread Piyush Acharya
Can you try calling batchDF.unpersist() once the work is done in loop?

On Mon, Jul 20, 2020 at 3:38 PM Yong Yuan  wrote:

> It seems the following structured streaming code keeps on consuming
> usercache until all disk space are occupied.
>
> val monitoring_stream =
> monitoring_df.writeStream
> .trigger(Trigger.ProcessingTime("120  seconds"))
> .foreachBatch {
> (batchDF: DataFrame, batchId: Long) =>
> if(!batchDF.isEmpty)   batchDF.show()
> }
>
>
> I even did not call batchDF.persist(). Do I need to really save/write
> batchDF to somewhere to release the usercache?
>
> I also tried to call spark.catalog.clearCache() explicitly in a loop,
> which does not help solve this problem either.
>
> Below figure also shows the capacity of the cluster is decreasing with the
> running of these codes.
>
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Re: schema changes of custom data source in persistent tables DataSourceV1

2020-07-20 Thread Piyush Acharya
Do you want to merge the schema when incoming data is changed?

spark.conf.set("spark.sql.parquet.mergeSchema", "true")

https://kontext.tech/column/spark/381/schema-merging-evolution-with-parquet-in-spark-and-hive


On Mon, Jul 20, 2020 at 3:48 PM fansparker  wrote:

> Does anybody know if there is a way to get the persisted table's schema
> updated when the underlying custom data source schema is changed?
> Currently,
> we have to drop and re-create the table.
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: schema changes of custom data source in persistent tables DataSourceV1

2020-07-20 Thread Russell Spitzer
The last I looked into this the answer is no. I believe since there is a
Spark Session internal relation cache, the only way to update a sessions
information was a full drop and create. That was my experience with a
custom hive metastore and entries read from it. I could change the entries
in the metastore underneath the session but since the session cached the
relation lookup I couldn't get it to reload the metadata.

DatssourceV2 does make this easy though

On Mon, Jul 20, 2020, 5:17 AM fansparker  wrote:

> Does anybody know if there is a way to get the persisted table's schema
> updated when the underlying custom data source schema is changed?
> Currently,
> we have to drop and re-create the table.
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Does Spark support column scan pruning for array of structs?

2020-07-20 Thread Haijia Zhou
I have a data frame in following schema:
household
root
 |-- country_code: string (nullable = true)
 |-- region_code: string (nullable = true)
 |-- individuals: array (nullable = true)
 ||-- element: struct (containsNull = true)
 |||-- individual_id: string (nullable = true)
 |||-- ids: array (nullable = true)
 ||||-- element: struct (containsNull = true)
 |||||-- id_last_seen: date (nullable = true)
 |||||-- type: string (nullable = true)
 |||||-- value: string (nullable = true)
 |||||-- year_released: integer (nullable = true)

I can use the following code to find households that contain at least one
device that was released after the year 2018

val sql = """
select household_id
from household
where exists(individuals, id -> exists(id.ids, dev -> dev.year_released >
2018))
"""
val v = spark.sql(sql)

It works well, however, I found the query planner was not able to prune the
unneeded columns, Spark instead has to read all columns of the nested
structures

Tested this with spark 2.4.5 and 3.0.0, got the same result.

Just wonder if Spark supports or will add support to column scan pruning
for an array of structs?


Re: Spark Streaming - Set Parallelism and Optimize driver

2020-07-20 Thread Russell Spitzer
Without your code this is hard to determine but a few notes.

The number of partitions is usually dictated by the input source, see if it
has any configuration which allows you to increase input splits.

I'm not sure why you think some of the code is running on the driver. All
methods on dataframes and rdds will be executed on executors. For each
partition is not local.

The difference in partitions is probably the shuffle you added with
repartition. I would actually be not surprised if your code ran faster
without the repartitioning. But again with the real code it would be very
hard to say.

On Mon, Jul 20, 2020, 6:33 AM forece85  wrote:

> I am new to spark streaming and trying to understand spark ui and to do
> optimizations.
>
> 1. Processing at executors took less time than at driver. How to optimize
> to
> make driver tasks fast ?
> 2. We are using dstream.repartition(defaultParallelism*3) to increase
> parallelism which is causing high shuffles. Is there any option to avoid
> repartition manually to reduce data shuffles.
> 3. Also trying to understand how 6 tasks in stage1 and 199 tasks in stage2
> got created?
>
> *hardware configuration:* executor-cores: 3; driver-cores: 3;
> dynamicAllocation is true;
> initial,min,maxExecutors: 25
>
> StackOverFlow link for screenshots:
>
> https://stackoverflow.com/questions/62993030/spark-dstream-help-needed-to-understand-ui-and-how-to-set-parallelism-or-defau
>
> Thanks in Advance
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark Streaming - Set Parallelism and Optimize driver

2020-07-20 Thread forece85
Thanks for reply. Please find sudo code below. We are fetching Dstreams from
kinesis stream for every 10sec and performing transformations and finally
persisting to hbase tables using batch insertions.

dStream = dStream.repartition(jssc.defaultMinPartitions() * 3);
dStream.foreachRDD(javaRDD -> javaRDD.foreachPartition(partitionOfRecords ->
{
Connection hbaseConnection =
ConnectionUtil.getHbaseConnection();
List listOfRecords = new ArrayList<>();
while (partitionOfRecords.hasNext()) {
try {
listOfRecords.add(partitionOfRecords.next());

if (listOfRecords.size() < 10 &&
partitionOfRecords.hasNext())
continue;

List finalListOfRecords = listOfRecords;
doJob(finalListOfRecords, primaryConnection,
lookupsConnection);
listOfRecords = new ArrayList<>();
} catch (Exception e) {
e.printStackTrace();
}
}
})); 

We are batching every 10 records and sending to doJob method where actual
transformations happen and every batch will get batch inserted to hbase
table.

With above code can we guess whats happening at Job 1 => 6 tasks and how to
reduce that time. 
Mainly how to effectively set parallelism avoiding repartition() method.

Thanks in Advance.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming - Set Parallelism and Optimize driver

2020-07-20 Thread forece85
Thanks for reply. Please find sudo code below. Its Dstreams reading for every
10secs from kinesis stream and after transformations, pushing into hbase.
Once got Dstream, we are using below code to repartition and do processing:

dStream = dStream.repartition(javaSparkContext.defaultMinPartitions() * 3);
dStream.foreachRDD(javaRDD -> javaRDD.foreachPartition(partitionOfRecords ->
{
   Connection hbaseConnection= ConnectionUtil.getHbaseConnection();
   List listOfRecords = new ArrayList<>();
   while (partitionOfRecords.hasNext()) {
 listOfRecords.add(partitionOfRecords.next());

 if (listOfRecords.size() < 10 && partitionOfRecords.hasNext())
continue;
 
 List finalListOfRecords = listOfRecords;
 doJob(finalListOfRecords, hbaseConnection);
 listOfRecords = new ArrayList<>();
   }
}));


We are batching every 10 records and pass to doJob method where we batch
process and bulk insert to hbase.

With above code, will it be able to tell what is happening at job 1 -> 6
tasks? and how to replace repartition method efficiently.

Thanks in Advance



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



How to monitor the throughput and latency of the combineByKey transformation in Spark 3?

2020-07-20 Thread Felipe Gutierrez
Hi community,

I built a simple count and sum spark application which uses the
combineByKey transformation [1] and I would like to monitor the
throughput in/out of this transformation and the latency that the
combineByKey spends to pre-aggregate tuples. Ideally, the latency I
would like to take the average of the last 30 seconds using a
histogram and the 99th percentile.

I was imagining to add a dropwizard metrics [2] on the combiner
function that I pass to the combineByKey. But It is confused because
there are 2 more functions that I must pass to the combineByKey.

How would you suggest me to implement this monitoring strategy?

Thanks,
Felipe
[1] 
https://github.com/felipegutierrez/explore-spark/blob/master/src/main/scala/org/sense/spark/app/combiners/TaxiRideCountCombineByKey.scala#L40
[2] https://metrics.dropwizard.io/4.1.2/getting-started.html

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark UI

2020-07-20 Thread ArtemisDev
Thanks Xiao for the info.  I was looking for this, too.  This page 
wasn't linked from anywhere on the main doc page (Overview) or any of 
the pull-down menus.  Someone should remind the doc team to update the 
table of contents on the Overview page.


-- ND

On 7/19/20 10:30 PM, Xiao Li wrote:
https://spark.apache.org/docs/3.0.0/web-ui.html is the official doc 
for Spark UI.


Xiao

On Sun, Jul 19, 2020 at 1:38 PM venkatadevarapu 
mailto:ramesh.biexp...@gmail.com>> wrote:


Hi,

I'm looking for a tutorial/video/material which explains the
content of
various tabes in SPARK WEB UI.
Can some one direct me with the relevant info.

Thanks



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org




--



Re: schema changes of custom data source in persistent tables DataSourceV1

2020-07-20 Thread fansparker
Thanks Russell.  This

  
shows that the "refreshTable" and "invalidateTable" could be used to reload
the metadata but they do not work in our case. I have tried to invoke the
"schema()" with the updated schema from the "buildScan()" as well. 

It will be helpful to have this feature enabled for DataSourceV1 as the
schema evolves, i will check if this is an change that can be made.

You mentioned that it works in DataSourceV2. Is there an implementation
sample for persistent tables DataSourceV2 that works with spark 2.4.4?
Thanks again.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming - Set Parallelism and Optimize driver

2020-07-20 Thread Russell Spitzer
Without seeing the rest (and you can confirm this by looking at the DAG
visualization in the Spark UI) I would say your first stage with 6
partitions is:

Stage 1: Read data from kinesis (or read blocks from receiver not sure what
method you are using) and write shuffle files for repartition
Stage 2 : Read shuffle files and do everything else

In general I think the biggest issue here is probably not the distribution
of tasks which based on your UI reading were quite small, but instead the
parallelization of the write operation since it is done synchronously. I
would suggest instead of trying to increase your parallelism by
partitioning, you attempt to have "doJob" run asynchronously and allow for
more parallelism within an executor rather than using multiple executor
threads/jvms.

That said you probably would run faster if you just skipped the repartition
based on the speed of second stage.

On Mon, Jul 20, 2020 at 8:23 AM forece85  wrote:

> Thanks for reply. Please find sudo code below. Its Dstreams reading for
> every
> 10secs from kinesis stream and after transformations, pushing into hbase.
> Once got Dstream, we are using below code to repartition and do processing:
>
> dStream = dStream.repartition(javaSparkContext.defaultMinPartitions() * 3);
> dStream.foreachRDD(javaRDD -> javaRDD.foreachPartition(partitionOfRecords
> ->
> {
>Connection hbaseConnection= ConnectionUtil.getHbaseConnection();
>List listOfRecords = new ArrayList<>();
>while (partitionOfRecords.hasNext()) {
>  listOfRecords.add(partitionOfRecords.next());
>
>  if (listOfRecords.size() < 10 && partitionOfRecords.hasNext())
> continue;
>
>  List finalListOfRecords = listOfRecords;
>  doJob(finalListOfRecords, hbaseConnection);
>  listOfRecords = new ArrayList<>();
>}
> }));
>
>
> We are batching every 10 records and pass to doJob method where we batch
> process and bulk insert to hbase.
>
> With above code, will it be able to tell what is happening at job 1 -> 6
> tasks? and how to replace repartition method efficiently.
>
> Thanks in Advance
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: schema changes of custom data source in persistent tables DataSourceV1

2020-07-20 Thread Russell Spitzer
The code you linked to is very old and I don't think that method works
anymore (Hive context not existing anymore). My latest attempt at trying
this was Spark 2.2 and I ran into the issues I wrote about before.

In DSV2 it's done via a catalog implementation, so you basically can write
a new catalog that can create tables and such with whatever metadata you
like. I'm not sure there is a Hive Metastore catalog implemented yet in
DSV2. I also think if it was it would only be in Spark 3.0

On Mon, Jul 20, 2020 at 10:05 AM fansparker  wrote:

> Thanks Russell.  This
> <
> https://gite.lirmm.fr/yagoubi/spark/commit/6463e0b9e8067cce70602c5c9006a2546856a9d6#fecff1a3ad108a52192ba9cd6dd7b11a3d18871b_0_141>
>
> shows that the "refreshTable" and "invalidateTable" could be used to reload
> the metadata but they do not work in our case. I have tried to invoke the
> "schema()" with the updated schema from the "buildScan()" as well.
>
> It will be helpful to have this feature enabled for DataSourceV1 as the
> schema evolves, i will check if this is an change that can be made.
>
> You mentioned that it works in DataSourceV2. Is there an implementation
> sample for persistent tables DataSourceV2 that works with spark 2.4.4?
> Thanks again.
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: schema changes of custom data source in persistent tables DataSourceV1

2020-07-20 Thread fansparker
Makes sense, Russell. I am trying to figure out if there is a way to enforce
metadata reload at "createRelation" if the provided schema in the new
sparkSession is different than the existing metadata schema. 



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Future timeout

2020-07-20 Thread Amit Sharma
Please help on this.


Thanks
Amit

On Fri, Jul 17, 2020 at 9:10 AM Amit Sharma  wrote:

> Hi, sometimes my spark streaming job throw this exception  Futures timed
> out after [300 seconds].
> I am not sure where is the default timeout configuration. Can i increase
> it. Please help.
>
>
>
> Thanks
> Amit
>
>
>
> Caused by: java.util.concurrent.TimeoutException: Futures timed out after
> [300 seconds]
> at
> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
> at
> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
> at
> org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201)
> at
> org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:136)
> at
> org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:372)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:144)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:140)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
> at
> org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:140)
> at
> org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:116)
> at
> org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenOuter(BroadcastHashJoinExec.scala:257)
> at
> org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:101)
> at
> org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:186)
> at
> org.apache.spark.sql.execution.ProjectExec.consume(basicPhysicalOperators.scala:35)
> at
> org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:65)
> at
> org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:186)
> at
> org.apache.spark.sql.execution.SerializeFromObjectExec.consume(objects.scala:101)
> at
> org.apache.spark.sql.execution.SerializeFromObjectExec.doConsume(objects.scala:121)
> at
> org.apache.spark.sql.execution.CodegenSupport$class.constructDoConsumeFunction(WholeStageCodegenExec.scala:213)
> at
> org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:184)
> at
> org.apache.spark.sql.execution.MapElementsExec.consume(objects.scala:200)
> at
> org.apache.spark.sql.execution.MapElementsExec.doConsume(objects.scala:224)
> at
> org.apache.spark.sql.execution.CodegenSupport$class.constructDoConsumeFunction(WholeStageCodegenExec.scala:213)
> at
> org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:184)
> at
> org.apache.spark.sql.execution.DeserializeToObjectExec.consume(objects.scala:68)
>


Insert overwrite using select with in same table

2020-07-20 Thread Utkarsh Jain
Hello community,

I am not sure that using 'insert overwrite using select within same table'
via spark SQL is a good approach or is there any other best  alternative
approach to do this.


Re: Garbage collection issue

2020-07-20 Thread Jeff Evans
What is your heap size, and JVM vendor/version?  Generally, G1 only
outperforms CMS on large heap sizes (ex: 31GB or larger).

On Mon, Jul 20, 2020 at 1:22 PM Amit Sharma  wrote:

> Please help on this.
>
>
> Thanks
> Amit
>
> On Fri, Jul 17, 2020 at 2:34 PM Amit Sharma  wrote:
>
>> Hi All, i am running the same batch job in my two separate spark
>> clusters. In one of the clusters it is showing GC warning  on spark -ui
>> under executer tag. Garbage collection is taking longer time around 20 %
>> while in another cluster it is under 10 %. I am using the same
>> configuration in my spark submit and using G1GC .
>>
>> Please let me know what could be the reason for GC slowness.
>>
>>
>> Thanks
>> Amit
>>
>


Re: Garbage collection issue

2020-07-20 Thread Amit Sharma
Please help on this.


Thanks
Amit

On Fri, Jul 17, 2020 at 2:34 PM Amit Sharma  wrote:

> Hi All, i am running the same batch job in my two separate spark clusters.
> In one of the clusters it is showing GC warning  on spark -ui  under
> executer tag. Garbage collection is taking longer time around 20 %  while
> in another cluster it is under 10 %. I am using the same configuration in
> my spark submit and using G1GC .
>
> Please let me know what could be the reason for GC slowness.
>
>
> Thanks
> Amit
>


Re: Garbage collection issue

2020-07-20 Thread Russell Spitzer
High GC is relatively hard to debug in general but I can give you a few
pointers. This basically means that the time spent cleaning up unused
objects is high which usually means memory is be used and thrown away
rapidly. It can also mean that GC is ineffective, and is being run many
times in an attempt to find things to free up. Since each run is not very
effective (because many things are still in use and cannot be thrown out)
it has to run more often.

So usually the easiest thing to do if possible is to increase the heap size
and hope that you are just seeing GC pressure because you need more free
memory than the JVM had. So I would recommend that as a first step,
increase the Executor Heap.

The longer and harder thing to do is to see exactly where object allocation
is taking place and attempt to minimize it. This requires walking through
your code, looking for long lived allocations and minimizing them if
possible.

On Mon, Jul 20, 2020 at 1:22 PM Amit Sharma  wrote:

> Please help on this.
>
>
> Thanks
> Amit
>
> On Fri, Jul 17, 2020 at 2:34 PM Amit Sharma  wrote:
>
>> Hi All, i am running the same batch job in my two separate spark
>> clusters. In one of the clusters it is showing GC warning  on spark -ui
>> under executer tag. Garbage collection is taking longer time around 20 %
>> while in another cluster it is under 10 %. I am using the same
>> configuration in my spark submit and using G1GC .
>>
>> Please let me know what could be the reason for GC slowness.
>>
>>
>> Thanks
>> Amit
>>
>


Insert overwrite using select within same table

2020-07-20 Thread Utkarsh Jain
Hello community,

I am not sure that using 'insert overwrite using select within same table'
via spark SQL is a good approach or is there any other best  alternative
approach to do this.

Thanks
Utkarsh


Re: Future timeout

2020-07-20 Thread Terry Kim
"spark.sql.broadcastTimeout" is the config you can use:
https://github.com/apache/spark/blob/fe07521c9efd9ce0913eee0d42b0ffd98b1225ec/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L863

Thanks,
Terry

On Mon, Jul 20, 2020 at 11:20 AM Amit Sharma  wrote:

> Please help on this.
>
>
> Thanks
> Amit
>
> On Fri, Jul 17, 2020 at 9:10 AM Amit Sharma  wrote:
>
>> Hi, sometimes my spark streaming job throw this exception  Futures timed
>> out after [300 seconds].
>> I am not sure where is the default timeout configuration. Can i increase
>> it. Please help.
>>
>>
>>
>> Thanks
>> Amit
>>
>>
>>
>> Caused by: java.util.concurrent.TimeoutException: Futures timed out after
>> [300 seconds]
>> at
>> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>> at
>> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>> at
>> org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201)
>> at
>> org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:136)
>> at
>> org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:372)
>> at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:144)
>> at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:140)
>> at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>> at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>> at
>> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>> at
>> org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:140)
>> at
>> org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:116)
>> at
>> org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenOuter(BroadcastHashJoinExec.scala:257)
>> at
>> org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:101)
>> at
>> org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:186)
>> at
>> org.apache.spark.sql.execution.ProjectExec.consume(basicPhysicalOperators.scala:35)
>> at
>> org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:65)
>> at
>> org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:186)
>> at
>> org.apache.spark.sql.execution.SerializeFromObjectExec.consume(objects.scala:101)
>> at
>> org.apache.spark.sql.execution.SerializeFromObjectExec.doConsume(objects.scala:121)
>> at
>> org.apache.spark.sql.execution.CodegenSupport$class.constructDoConsumeFunction(WholeStageCodegenExec.scala:213)
>> at
>> org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:184)
>> at
>> org.apache.spark.sql.execution.MapElementsExec.consume(objects.scala:200)
>> at
>> org.apache.spark.sql.execution.MapElementsExec.doConsume(objects.scala:224)
>> at
>> org.apache.spark.sql.execution.CodegenSupport$class.constructDoConsumeFunction(WholeStageCodegenExec.scala:213)
>> at
>> org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:184)
>> at
>> org.apache.spark.sql.execution.DeserializeToObjectExec.consume(objects.scala:68)
>>
>


Re: Insert overwrite using select within same table

2020-07-20 Thread Umesh Bansal
You can use temp table to insert the data and then use temp table to insert
back to main table

On Tue, 21 Jul 2020 at 12:01 AM, Utkarsh Jain  wrote:

> Hello community,
>
> I am not sure that using 'insert overwrite using select within same table'
> via spark SQL is a good approach or is there any other best  alternative
> approach to do this.
>
> Thanks
> Utkarsh
>


Re: Needed some best practices to integrate Spark with HBase

2020-07-20 Thread YogeshGovi
I also need good docs on this. Especially integrating pyspark with hive
reading tables from hbase.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org