Reading Hadoop Archive from Spark

2020-04-27 Thread To Quoc Cuong
 Hello,
After archiving parquets into a HAR (Hadoop Archive) file, its data format has 
the following layout:foo.har/_masterindex //stores hashes and offsets

foo.har/_index //stores file statuses

foo.har/part-[0..n] //stores actual parquet files combined in sequential
So, we can access parquet file inside the HAR by:
spark.read.parquet("hdfs:///user/cyber/dataset/HARFolder/foo.har/")
or second way:
spark.read.parquet("hdfs:///user/cyber/dataset/HARFolder/foo.har/part-0")
If the HAR file contains only 1 parquet, we can read entirely. But if the HAR 
file contains more than 1 parquet, we can read only the first parquet, and 
ignore the other parquets inside this HAR. This is because the archiving 
process treats parquet files as binary files, and it just appends these 
multiple binary files into part-0 file. So only the header of the first parquet 
file is placed on the header of the part-0 file. The headers of other parquet 
files are placed in somewhere in the middle of part-0 file. So when we use 
spark.read.parquet("hdfs:///foo.har/part-0"), it scans only the header of 
part-0, which is also the header of first parquet, and skips the rest.

For example, if foo.har contains tintin_milou.parquet, we can read 
successfully. But if foo2.har contains 2 parquets (tintin_milou.parquet and 
tintin_milou2.parquet), we can read only tintin_milou.parquet, and fail to read 
tintin_milou2.parquet. Furthermore, if foo3.har contains 2 parquets that has 
different schema (like tintin_.milou.parquet and cdr.parquet), we cannot read 
both of them.

We CANNOT access original parquets by these 2 ways:
spark.read.parquet("hdfs:///user/cyber/dataset/HARFolder/foo2.har/tintin_milou.parquet")
spark.read.parquet("har:///user/cyber/dataset/HARFolder/foo2.har/tintin_milou.parquet")
Even though we can access this original parquet by hadoop:
hadoop dfs -ls har:///user/cyber/dataset/HARFolder/foo2.har
Output: (assume that tintin_milou.parquet and tintin_milou2.parquet are 
archived into foo2.har)
har:///user/cyber/dataset/HARFolder/foo2.har/tintin_milou.parquet
har:///user/cyber/dataset/HARFolder/foo2.har/tintin_milou2.parquet

So does anyone know how to read multiple parquet files inside a HAR with Spark ?
Thanks



unsubscribe

2020-04-27 Thread Hongbin Liu
unsubscribe



This message may contain confidential information and is intended for specific 
recipients unless explicitly noted otherwise. If you have reason to believe you 
are not an intended recipient of this message, please delete it and notify the 
sender. This message may not represent the opinion of Intercontinental 
Exchange, Inc. (ICE), its subsidiaries or affiliates, and does not constitute a 
contract or guarantee. Unencrypted electronic mail is not secure and the 
recipient of this message is expected to provide safeguards from viruses and 
pursue alternate means of communication where privacy or a binding message is 
desired.


Re: [pyspark] Load a master data file to spark ecosystem

2020-04-27 Thread Arjun Chundiran
Hi Sonal,

The tree file is a file in radix tree format. tree_lookup_value is a
function which looks up the value for a particular value in key.

Thanks,
Arjun

On Sat, Apr 25, 2020 at 10:28 AM Sonal Goyal  wrote:

> How does your tree_lookup_value function work?
>
> Thanks,
> Sonal
> Nube Technologies 
>
> 
>
>
>
>
> On Fri, Apr 24, 2020 at 8:47 PM Arjun Chundiran 
> wrote:
>
>> Hi Team,
>>
>> I have asked this question in stack overflow
>> 
>> and I didn't really get any convincing answers. Can somebody help me to
>> solve this issue?
>>
>> Below is my problem
>>
>> While building a log processing system, I came across a scenario where I
>> need to look up data from a tree file (Like a DB) for each and every log
>> line for corresponding value. What is the best approach to load an external
>> file which is very large into the spark ecosystem? The tree file is of size
>> 2GB.
>>
>> Here is my scenario
>>
>>1. I have a file contains huge number of log lines.
>>2. Each log line needs to be split by a delimiter to 70 fields
>>3. Need to lookup the data from tree file for one of the 70 fields of
>>a log line.
>>
>> I am using Apache Spark Python API and running on a 3 node cluster.
>>
>> Below is the code which I have written. But it is really slow
>>
>> def process_logline(line, tree):
>> row_dict = {}
>> line_list = line.split(" ")
>> row_dict["host"] = tree_lookup_value(tree, line_list[0])
>> new_row = Row(**row_dict)
>> return new_row
>> def run_job(vals):
>> spark.sparkContext.addFile('somefile')
>> tree_val = open(SparkFiles.get('somefile'))
>> lines = spark.sparkContext.textFile("log_file")
>> converted_lines_rdd = lines.map(lambda l: process_logline(l, tree_val))
>> log_line_rdd = spark.createDataFrame(converted_lines_rdd)
>> log_line_rdd.show()
>>
>> Basically I need some option to load the file one time in memory of workers 
>> and start using it entire job life time using Python API.
>>
>> Thanks in advance
>> Arjun
>>
>>
>>
>>


Re: [pyspark] Load a master data file to spark ecosystem

2020-04-27 Thread Arjun Chundiran
Hi Roland,

As per my understanding, While creating the data frame, SPARK creates the
file into partitions and make it distributed. But my tree file contains the
data structured in radix tree format. tree_lookup_value is the method which
we use to look up for a specific key in that tree. So I don't think my tree
file will work if it is split into partitions.

NB: I am new to spark. Please correct me if I am wrong

Thanks,
Arjun

On Sat, Apr 25, 2020 at 1:24 PM Roland Johann 
wrote:

> You can read both, the logs and the tree file into dataframes and join
> them. Doing this spark can distribute the relevant records or even the
> whole dataframe via broadcast to optimize the execution.
>
> Best regards
>
> Sonal Goyal  schrieb am Sa. 25. Apr. 2020 um 06:59:
>
>> How does your tree_lookup_value function work?
>>
>> Thanks,
>> Sonal
>> Nube Technologies 
>>
>> 
>>
>>
>>
>>
>> On Fri, Apr 24, 2020 at 8:47 PM Arjun Chundiran 
>> wrote:
>>
>>> Hi Team,
>>>
>>> I have asked this question in stack overflow
>>> 
>>> and I didn't really get any convincing answers. Can somebody help me to
>>> solve this issue?
>>>
>>> Below is my problem
>>>
>>> While building a log processing system, I came across a scenario where I
>>> need to look up data from a tree file (Like a DB) for each and every log
>>> line for corresponding value. What is the best approach to load an external
>>> file which is very large into the spark ecosystem? The tree file is of size
>>> 2GB.
>>>
>>> Here is my scenario
>>>
>>>1. I have a file contains huge number of log lines.
>>>2. Each log line needs to be split by a delimiter to 70 fields
>>>3. Need to lookup the data from tree file for one of the 70 fields
>>>of a log line.
>>>
>>> I am using Apache Spark Python API and running on a 3 node cluster.
>>>
>>> Below is the code which I have written. But it is really slow
>>>
>>> def process_logline(line, tree):
>>> row_dict = {}
>>> line_list = line.split(" ")
>>> row_dict["host"] = tree_lookup_value(tree, line_list[0])
>>> new_row = Row(**row_dict)
>>> return new_row
>>> def run_job(vals):
>>> spark.sparkContext.addFile('somefile')
>>> tree_val = open(SparkFiles.get('somefile'))
>>> lines = spark.sparkContext.textFile("log_file")
>>> converted_lines_rdd = lines.map(lambda l: process_logline(l, tree_val))
>>> log_line_rdd = spark.createDataFrame(converted_lines_rdd)
>>> log_line_rdd.show()
>>>
>>> Basically I need some option to load the file one time in memory of workers 
>>> and start using it entire job life time using Python API.
>>>
>>> Thanks in advance
>>> Arjun
>>>
>>>
>>>
>>> --
> Roland Johann
> Software Developer/Data Engineer
>
> phenetic GmbH
> Lütticher Straße 10, 50674 Köln, Germany
>
> Mobil: +49 172 365 26 46
> Mail: roland.joh...@phenetic.io
> Web: phenetic.io
>
> Handelsregister: Amtsgericht Köln (HRB 92595)
> Geschäftsführer: Roland Johann, Uwe Reimann
>


Re: [pyspark] Load a master data file to spark ecosystem

2020-04-27 Thread Arjun Chundiran
Hi Gourav,

I am first creating rdds and converting it into dataframes, since I need to
map the value from my tree file while making the data frames

Thanks,
Arjun

On Sun, Apr 26, 2020 at 9:33 PM Gourav Sengupta 
wrote:

> Hi,
>
> Why are you using RDDs? And how are the files stored in terms if
> compression?
>
> Regards
> Gourav
>
> On Sat, 25 Apr 2020, 08:54 Roland Johann,
>  wrote:
>
>> You can read both, the logs and the tree file into dataframes and join
>> them. Doing this spark can distribute the relevant records or even the
>> whole dataframe via broadcast to optimize the execution.
>>
>> Best regards
>>
>> Sonal Goyal  schrieb am Sa. 25. Apr. 2020 um
>> 06:59:
>>
>>> How does your tree_lookup_value function work?
>>>
>>> Thanks,
>>> Sonal
>>> Nube Technologies 
>>>
>>> 
>>>
>>>
>>>
>>>
>>> On Fri, Apr 24, 2020 at 8:47 PM Arjun Chundiran 
>>> wrote:
>>>
 Hi Team,

 I have asked this question in stack overflow
 
 and I didn't really get any convincing answers. Can somebody help me to
 solve this issue?

 Below is my problem

 While building a log processing system, I came across a scenario where
 I need to look up data from a tree file (Like a DB) for each and every log
 line for corresponding value. What is the best approach to load an external
 file which is very large into the spark ecosystem? The tree file is of size
 2GB.

 Here is my scenario

1. I have a file contains huge number of log lines.
2. Each log line needs to be split by a delimiter to 70 fields
3. Need to lookup the data from tree file for one of the 70 fields
of a log line.

 I am using Apache Spark Python API and running on a 3 node cluster.

 Below is the code which I have written. But it is really slow

 def process_logline(line, tree):
 row_dict = {}
 line_list = line.split(" ")
 row_dict["host"] = tree_lookup_value(tree, line_list[0])
 new_row = Row(**row_dict)
 return new_row
 def run_job(vals):
 spark.sparkContext.addFile('somefile')
 tree_val = open(SparkFiles.get('somefile'))
 lines = spark.sparkContext.textFile("log_file")
 converted_lines_rdd = lines.map(lambda l: process_logline(l, tree_val))
 log_line_rdd = spark.createDataFrame(converted_lines_rdd)
 log_line_rdd.show()

 Basically I need some option to load the file one time in memory of 
 workers and start using it entire job life time using Python API.

 Thanks in advance
 Arjun



 --
>> Roland Johann
>> Software Developer/Data Engineer
>>
>> phenetic GmbH
>> Lütticher Straße 10, 50674 Köln, Germany
>>
>> Mobil: +49 172 365 26 46
>> Mail: roland.joh...@phenetic.io
>> Web: phenetic.io
>>
>> Handelsregister: Amtsgericht Köln (HRB 92595)
>> Geschäftsführer: Roland Johann, Uwe Reimann
>>
>


Re: [pyspark] Load a master data file to spark ecosystem

2020-04-27 Thread Arjun Chundiran
Below is the reason, why I didn't use dataframes directly
As per my understanding, While creating the data frame, SPARK creates the
file into partitions and make it distributed. But my tree file contains the
data structured in radix tree format. tree_lookup_value is the method which
we use to look up for a specific key in that tree. So I don't think my tree
file will work if it is split into partitions.

NB: I am new to spark. Please correct me if I am wrong

Thanks,
Arjun

On Sun, Apr 26, 2020 at 10:58 PM Edgardo Szrajber 
wrote:

> In the below  code you are impeding Spark from doing what is meant to do.
> As mentioned below, the best (and easiest to implement) aproach would be
> to load each file into a dataframe and join between them.
> Even doing a key join with RDDS would be better, but in your case you are
> forcing a one by one calculation.
> Bentzi
>
>
>
> Sent from Yahoo Mail on Android
> 
>
> On Sun, Apr 26, 2020 at 19:03, Gourav Sengupta
>  wrote:
> Hi,
>
> Why are you using RDDs? And how are the files stored in terms if
> compression?
>
> Regards
> Gourav
>
> On Sat, 25 Apr 2020, 08:54 Roland Johann,
>  wrote:
>
> You can read both, the logs and the tree file into dataframes and join
> them. Doing this spark can distribute the relevant records or even the
> whole dataframe via broadcast to optimize the execution.
>
> Best regards
>
> Sonal Goyal  schrieb am Sa. 25. Apr. 2020 um 06:59:
>
> How does your tree_lookup_value function work?
>
> Thanks,
> Sonal
> Nube Technologies 
>
> 
>
>
>
>
> On Fri, Apr 24, 2020 at 8:47 PM Arjun Chundiran 
> wrote:
>
> Hi Team,
>
> I have asked this question in stack overflow
> 
> and I didn't really get any convincing answers. Can somebody help me to
> solve this issue?
>
> Below is my problem
>
> While building a log processing system, I came across a scenario where I
> need to look up data from a tree file (Like a DB) for each and every log
> line for corresponding value. What is the best approach to load an external
> file which is very large into the spark ecosystem? The tree file is of size
> 2GB.
>
> Here is my scenario
>
>1. I have a file contains huge number of log lines.
>2. Each log line needs to be split by a delimiter to 70 fields
>3. Need to lookup the data from tree file for one of the 70 fields of
>a log line.
>
> I am using Apache Spark Python API and running on a 3 node cluster.
>
> Below is the code which I have written. But it is really slow
>
> def process_logline(line, tree):
> row_dict = {}
> line_list = line.split(" ")
> row_dict["host"] = tree_lookup_value(tree, line_list[0])
> new_row = Row(**row_dict)
> return new_row
> def run_job(vals):
> spark.sparkContext.addFile('somefile')
> tree_val = open(SparkFiles.get('somefile'))
> lines = spark.sparkContext.textFile("log_file")
> converted_lines_rdd = lines.map(lambda l: process_logline(l, tree_val))
> log_line_rdd = spark.createDataFrame(converted_lines_rdd)
> log_line_rdd.show()
>
> Basically I need some option to load the file one time in memory of workers 
> and start using it entire job life time using Python API.
>
> Thanks in advance
> Arjun
>
>
>
> --
> Roland Johann
> Software Developer/Data Engineer
>
> phenetic GmbH
> Lütticher Straße 10, 50674 Köln, Germany
>
> Mobil: +49 172 365 26 46
> Mail: roland.joh...@phenetic.io
> Web: phenetic.io
>
> Handelsregister: Amtsgericht Köln (HRB 92595)
> Geschäftsführer: Roland Johann, Uwe Reimann
>
>


Unsubscribe

2020-04-27 Thread Natalie Ruiz
Unsubscribe

Téléchargez Outlook pour iOS


SparkLauncher reliability and scalability

2020-04-27 Thread mhd wrk
We are using SparkLauncher and SparkAppHandle.Listener to launch spark
applications from a Java web application and listen to the state changes.
Our observation is that as the number of concurrent jobs grow sometimes
some of the state changes are not reported (e.g. some applications never
report final state even when the corresponding spark job in YARN UI is
marked FINISHED). I'm wondering if there are any guidelines/limits on
launching (potentially large number of long running), concurrent spark jobs?

Thanks,


[Announcement] Analytics Zoo 0.8 release

2020-04-27 Thread Jason Dai
Hi all,



We are happy to announce the 0.8 release of Analytics Zoo
, a unified Data
Analytics and AI platform for *distributed TensorFlow, Keras, PyTorch,
BigDL, Apache Spark/Flink and Ray**. S*ome of the notable new features in
this release are:



   - First official release of AutoML support
   
   - Improved support for running Analytics Zoo on K8s
   
   - Improvement to tfpark
   
   (distributed TensorFlow on Spark), including seamless scale-out of
*tf.data.Dataset
   pipelines on Spark
   
*,
   support of *Spark Dataframes in TFDataSet
   
*,
   support for pre-made TensorFlow Estimator, etc.
   - Improvement to Cluster Serving
   

   (automatically distributed serving of DL models), including support for
   performance mode, better TensorBoard integration, etc.
   - Improvement to time series analysis (including new MTNet model and
   project Zouwu )
   - Upgrading OpenVINO support to 2020 R1



For more details, you may refer to the project website at
https://github.com/intel-analytics/analytics-zoo/, and the Getting Started
 page.



Thanks,

-Jason


Fwd: [Announcement] Analytics Zoo 0.8 release

2020-04-27 Thread Jason Dai
FYI :-)

-- Forwarded message -
From: Jason Dai 
Date: Tue, Apr 28, 2020 at 10:31 AM
Subject: [Announcement] Analytics Zoo 0.8 release
To: BigDL User Group 

Hi all,



We are happy to announce the 0.8 release of Analytics Zoo
, a unified Data
Analytics and AI platform for *distributed TensorFlow, Keras, PyTorch,
BigDL, Apache Spark/Flink and Ray**. S*ome of the notable new features in
this release are:



   - First official release of AutoML support
   
   - Improved support for running Analytics Zoo on K8s
   
   - Improvement to tfpark
   
(distributed
   TensorFlow on Spark), including seamless scale-out of
*tf.data.Dataset pipelines
   on Spark
   
*,
   support of *Spark Dataframes in TFDataSet
   
*,
   support for pre-made TensorFlow Estimator, etc.
   - Improvement to Cluster Serving
   

(automatically
   distributed serving of DL models), including support for performance mode,
   better TensorBoard integration, etc.
   - Improvement to time series analysis (including new MTNet model and
   project Zouwu )
   - Upgrading OpenVINO support to 2020 R1



For more details, you may refer to the project website at
https://github.com/intel-analytics/analytics-zoo/, and the Getting Started
 page.



Thanks,

-Jason


[Structured Streaming] NullPointerException in long running query

2020-04-27 Thread lec ssmi
Hi:
  One of my long-running queries occasionally encountered the following
exception:


  Caused by: org.apache.spark.SparkException: Job aborted due to stage
> failure: Task 10 in stage 1.0 failed 4 times, most recent failure: Lost
> task 10.3 in stage 1.0 (TID 81, spark6, executor 1):
> java.lang.NullPointerException
> Driver stacktrace:
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1602)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1590)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1589)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1589)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
> at scala.Option.foreach(Option.scala:257)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1823)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1772)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1761)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
> at
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:929)
> at
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:927)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
> at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:927)
> at
> org.apache.spark.sql.execution.streaming.ForeachSink.addBatch(ForeachSink.scala:49)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$16.apply(MicroBatchExecution.scala:475)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:473)
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
> at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org
> $apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:472)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
> at
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
> at org.apache.spark.sql.execution.streaming.StreamExecution.org
> $apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
> ... 1 more



According to the exception stack, it seems to have nothing to do with the
logic of my code.Is this a spark bug or something? The version of spark is
2.3.1.

Best
Lec Ssmi


Re: [Structured Streaming] NullPointerException in long running query

2020-04-27 Thread Jungtaek Lim
The root cause of exception is occurred in executor side "Lost task 10.3 in
stage 1.0 (TID 81, spark6, executor 1)" so you may need to check there.

On Tue, Apr 28, 2020 at 2:52 PM lec ssmi  wrote:

> Hi:
>   One of my long-running queries occasionally encountered the following
> exception:
>
>
>   Caused by: org.apache.spark.SparkException: Job aborted due to stage
>> failure: Task 10 in stage 1.0 failed 4 times, most recent failure: Lost
>> task 10.3 in stage 1.0 (TID 81, spark6, executor 1):
>> java.lang.NullPointerException
>> Driver stacktrace:
>> at org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1602)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1590)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1589)
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>> at
>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1589)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
>> at scala.Option.foreach(Option.scala:257)
>> at
>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
>> at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1823)
>> at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1772)
>> at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1761)
>> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:929)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:927)
>> at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>> at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>> at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
>> at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:927)
>> at
>> org.apache.spark.sql.execution.streaming.ForeachSink.addBatch(ForeachSink.scala:49)
>> at
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$16.apply(MicroBatchExecution.scala:475)
>> at
>> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
>> at
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:473)
>> at
>> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
>> at
>> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
>> at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org
>> $apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:472)
>> at
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133)
>> at
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
>> at
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
>> at
>> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
>> at
>> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
>> at
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
>> at
>> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
>> at
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
>> at org.apache.spark.sql.execution.streaming.StreamExecution.org
>> $apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
>> ... 1 more
>
>
>
> According to the exception stack, it seems to have nothi