Thanks Eve for response.

Yes I know we can use broadcast for smaller datasets,I increased the
threshold (4Gb) for the same then also it did not work. and the df3 is
somewhat greater than 2gb.

Trying by removing broadcast as well.. Job is running since 1 hour. Will
let you know.


Thanks
Sachit

On Wed, 7 Oct 2020, 00:41 Eve Liao, <evelia...@gmail.com> wrote:

> How many rows does df3 have? Broadcast joins are a great way to append
> data stored in relatively *small* single source of truth data files to
> large DataFrames. DataFrames up to 2GB can be broadcasted so a data file
> with tens or even hundreds of thousands of rows is a broadcast candidate.
> Your broadcast variable is probably too large.
>
> On Tue, Oct 6, 2020 at 11:37 AM Sachit Murarka <connectsac...@gmail.com>
> wrote:
>
>> Hello Users,
>>
>> I am facing an issue in spark job where I am doing row number() without
>> partition by clause because I need to add sequential increasing IDs.
>> But to avoid the large spill I am not doing row number() over the
>> complete data frame.
>>
>> Instead I am applying monotically_increasing id on actual data set ,
>> then create a new data frame from original data frame which will have
>> just monotically_increasing id.
>>
>> So DF1 = All columns + monotically_increasing_id
>> DF2 = Monotically_increasingID
>>
>> Now I am applying row number() on DF2 since this is a smaller dataframe.
>>
>> DF3 = Monotically_increasingID + Row_Number_ID
>>
>> Df.join(broadcast(DF3))
>>
>> This will give me sequential increment id in the original Dataframe.
>>
>> But below is the stack trace.
>>
>> py4j.protocol.Py4JJavaError: An error occurred while calling o180.parquet.
>> : org.apache.spark.SparkException: Job aborted.
>>         at
>> org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
>>         at
>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
>>         at
>> org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
>>         at
>> org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
>>         at
>> org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
>>         at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>         at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>         at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>>         at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>         at
>> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>         at
>> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>         at
>> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
>>         at
>> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
>>         at
>> org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
>>         at
>> org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
>>         at
>> org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
>>         at
>> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
>>         at
>> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
>>         at
>> org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
>>         at
>> org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
>>         at
>> org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
>>         at
>> org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
>>         at
>> org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:566)
>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>         at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>         at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>         at java.lang.reflect.Method.invoke(Method.java:498)
>>         at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>>         at
>> py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>>         at py4j.Gateway.invoke(Gateway.java:282)
>>         at
>> py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>>         at py4j.commands.CallCommand.execute(CallCommand.java:79)
>>         at py4j.GatewayConnection.run(GatewayConnection.java:238)
>>         at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.spark.SparkException: Could not execute broadcast
>> in 1000 secs. You can increase the timeout for broadcasts via
>> spark.sql.broadcastTimeout or disable broadcast join by setting
>> spark.sql.autoBroadcastJoinThreshold to -1
>>
>> Initially this threshold was 300. I already increased it.
>>
>>
>> Kind Regards,
>> Sachit Murarka
>>
>

Reply via email to