Re: Future timeout

2020-07-21 Thread Piyush Acharya
spark.conf.set("spark.sql.broadcastTimeout",  ##)

On Mon, Jul 20, 2020 at 11:51 PM Amit Sharma  wrote:

> Please help on this.
>
>
> Thanks
> Amit
>
> On Fri, Jul 17, 2020 at 9:10 AM Amit Sharma  wrote:
>
>> Hi, sometimes my spark streaming job throw this exception  Futures timed
>> out after [300 seconds].
>> I am not sure where is the default timeout configuration. Can i increase
>> it. Please help.
>>
>>
>>
>> Thanks
>> Amit
>>
>>
>>
>> Caused by: java.util.concurrent.TimeoutException: Futures timed out after
>> [300 seconds]
>> at
>> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>> at
>> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>> at
>> org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201)
>> at
>> org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:136)
>> at
>> org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:372)
>> at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:144)
>> at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:140)
>> at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>> at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>> at
>> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>> at
>> org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:140)
>> at
>> org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:116)
>> at
>> org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenOuter(BroadcastHashJoinExec.scala:257)
>> at
>> org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:101)
>> at
>> org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:186)
>> at
>> org.apache.spark.sql.execution.ProjectExec.consume(basicPhysicalOperators.scala:35)
>> at
>> org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:65)
>> at
>> org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:186)
>> at
>> org.apache.spark.sql.execution.SerializeFromObjectExec.consume(objects.scala:101)
>> at
>> org.apache.spark.sql.execution.SerializeFromObjectExec.doConsume(objects.scala:121)
>> at
>> org.apache.spark.sql.execution.CodegenSupport$class.constructDoConsumeFunction(WholeStageCodegenExec.scala:213)
>> at
>> org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:184)
>> at
>> org.apache.spark.sql.execution.MapElementsExec.consume(objects.scala:200)
>> at
>> org.apache.spark.sql.execution.MapElementsExec.doConsume(objects.scala:224)
>> at
>> org.apache.spark.sql.execution.CodegenSupport$class.constructDoConsumeFunction(WholeStageCodegenExec.scala:213)
>> at
>> org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:184)
>> at
>> org.apache.spark.sql.execution.DeserializeToObjectExec.consume(objects.scala:68)
>>
>


Re: schema changes of custom data source in persistent tables DataSourceV1

2020-07-20 Thread Piyush Acharya
Do you want to merge the schema when incoming data is changed?

spark.conf.set("spark.sql.parquet.mergeSchema", "true")

https://kontext.tech/column/spark/381/schema-merging-evolution-with-parquet-in-spark-and-hive


On Mon, Jul 20, 2020 at 3:48 PM fansparker  wrote:

> Does anybody know if there is a way to get the persisted table's schema
> updated when the underlying custom data source schema is changed?
> Currently,
> we have to drop and re-create the table.
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark Structured Streaming keep on consuming usercache

2020-07-20 Thread Piyush Acharya
Can you try calling batchDF.unpersist() once the work is done in loop?

On Mon, Jul 20, 2020 at 3:38 PM Yong Yuan  wrote:

> It seems the following structured streaming code keeps on consuming
> usercache until all disk space are occupied.
>
> val monitoring_stream =
> monitoring_df.writeStream
> .trigger(Trigger.ProcessingTime("120  seconds"))
> .foreachBatch {
> (batchDF: DataFrame, batchId: Long) =>
> if(!batchDF.isEmpty)   batchDF.show()
> }
>
>
> I even did not call batchDF.persist(). Do I need to really save/write
> batchDF to somewhere to release the usercache?
>
> I also tried to call spark.catalog.clearCache() explicitly in a loop,
> which does not help solve this problem either.
>
> Below figure also shows the capacity of the cluster is decreasing with the
> running of these codes.
>
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Re: Spark UI

2020-07-19 Thread Piyush Acharya
https://www.youtube.com/watch?v=YgQgJceojJY  (Xiao's video )





On Mon, Jul 20, 2020 at 8:03 AM Xiao Li  wrote:

> https://spark.apache.org/docs/3.0.0/web-ui.html is the official doc
> for Spark UI.
>
> Xiao
>
> On Sun, Jul 19, 2020 at 1:38 PM venkatadevarapu 
> wrote:
>
>> Hi,
>>
>> I'm looking for a tutorial/video/material which explains the content of
>> various tabes in SPARK WEB UI.
>> Can some one direct me with the relevant info.
>>
>> Thanks
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>
> --
> 
>


Re: Schedule/Orchestrate spark structured streaming job

2020-07-19 Thread Piyush Acharya
Some of the options of workflows
https://medium.com/@xunnan.xu/workflow-processing-engine-overview-2018-airflow-vs-azkaban-vs-conductor-vs-oozie-vs-amazon-step-90affc54d53b

Streaming is a kind of infinitely running job, so, you just have to trigger
it only once unless you re not using it with Trigger = Once.

Regards,
..Piyush


On Sun, Jul 19, 2020 at 11:01 PM anbutech  wrote:

> Hi Team,
>
> I'm very new to spark structured streaming.could you please guide me how to
> Schedule/Orchestrate spark structured streaming job.Any scheduler similar
> like airflow.I knew airflow doesn't support streaming jobs.
>
> Thanks
> Anbu
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Overwrite Mode not Working Correctly in spark 3.0.0

2020-07-19 Thread Piyush Acharya
Can you please send the error message? it would ve very helpful to get to
the root cause.

On Sun, Jul 19, 2020 at 10:57 PM anbutech  wrote:

> Hi Team,
>
> I'm facing weird behavior in the pyspark dataframe(databricks delta spark
> 3.0.0 supported)
>
> I have tried the below two options to write the processed datafame data
> into
> delta table with respect to the partition columns in the table.Actually
> overwrite mode completely overwrite the whole table.i couldn't figure it
> out
> why did the dataframe fully overwrite here.
>
> Also i'm getting the following error while testing with below option 2
>
>
> Predicate references non-partition column 'json_feeds_flatten_data'. Only
> the partition columns may be referenced: [table_name, y, m, d, h];
>
> could you please me why did the pyspark behavior like this?.It would be
> very
> helpful to know the mistake here.
>
> sample partition column values:
> ---
>
> table_name='json_feeds_flatten_data'
> y=2020
> m=7
> d=19
> h=0
>
> Option 1:
>
> partition_keys=['table_name','y','m','d','h']
>
>  (final_df
>   .withColumn('y', lit(y).cast('int'))
>.withColumn('m', lit(m).cast('int'))
>.withColumn('d', lit(d).cast('int'))
>.withColumn('h', lit(h).cast('int'))
>.write
>.partitionBy(partition_keys)
>.format("delta")
>.mode('overwrite')
>.saveAsTable(target_table)
>  )
>
> Option 2:
>
> rep_wh = 'table_name={} AND y={} AND m={} AND d={} AND
> h={}'.format(table_name,y, m, d, h)
> (final_df
>   .withColumn('y', lit(y).cast('int'))
>   .withColumn('m', lit(m).cast('int'))
>   .withColumn('d', lit(d).cast('int'))
>   .withColumn('h', lit(h).cast('int'))
>   .write
>   .format("delta")
>   .mode('overwrite')
>   .option('replaceWhere', rep_wh )
>   .saveAsTable(target_table)
> )
>
> Thanks
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: OOM while processing read/write to S3 using Spark Structured Streaming

2020-07-19 Thread Piyush Acharya
Please try with maxBytesPerTrigger option, probably files are big enough to
crash the JVM.
Please give some info on Executors and file info ( size etc)

Regards,
..Piyush

On Sun, Jul 19, 2020 at 3:29 PM Rachana Srivastava
 wrote:

> *Issue:* I am trying to process 5000+ files of gzipped json file
> periodically from S3 using Structured Streaming code.
>
> *Here are the key steps:*
>
>1.
>
>Read json schema and broadccast to executors
>2.
>
>Read Stream
>
>Dataset inputDS = sparkSession.readStream() .format("text")
>.option("inferSchema", "true") .option("header", "true")
>.option("multiLine", true).schema(jsonSchema) .option("mode", "PERMISSIVE")
>.json(inputPath + "/*");
>3.
>
>Process each file in a map Dataset ds = inputDS.map(x -> { ... },
>Encoders.STRING());
>4.
>
>Write output to S3
>
>StreamingQuery query = ds .coalesce(1) .writeStream()
>.outputMode("append") .format("csv") ... .start();
>
> *maxFilesPerTrigger* is set to 500 so I was hoping the streaming will
> pick only that many file to process. Why are we getting OOM? If in a we
> have more than 3500 files then system crashes with OOM.
>
>


subscribe

2020-07-18 Thread Piyush Acharya