Spark on Kubernetes | 3.0.1 | Shared Volume or NFS

2021-03-11 Thread Ranju Jain
Hi,

I need to write all Executors pods data on some common location  which can be 
accessed and retrieved by driver pod.
I was first planning to go with NFS, but I think Shared Volume is equally good.
Please suggest Is there any major drawback in using Shared Volume instead of 
NFS when many pods are writing  on the same Volume [ReadWriteMany].

Regards
Ranju


Re: Spark on Kubernetes | 3.0.1 | Shared Volume or NFS

2021-03-11 Thread Mich Talebzadeh
Ok this is on Google Cloud correct?




LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*





*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 11 Mar 2021 at 11:29, Ranju Jain 
wrote:

> Hi,
>
>
>
> I need to write all Executors pods data on some common location  which can
> be accessed and retrieved by driver pod.
>
> I was first planning to go with NFS, but I think Shared Volume is equally
> good.
>
> Please suggest Is there any major drawback in using Shared Volume instead
> of NFS when many pods are writing  on the same Volume [ReadWriteMany].
>
>
>
> Regards
>
> Ranju
>


RE: Spark on Kubernetes | 3.0.1 | Shared Volume or NFS

2021-03-11 Thread Ranju Jain
Hi Mich,

No, it is not Google cloud. It is simply Kubernetes deployed over Bare Metal 
Platform.
I am not clear for pros and cons of Shared Volume vs NFS for Read Write Many.
As NFS is Network File Server [remote] , so I can figure out that Shared Volume 
should be more preferable, but don’t know the other sides [drawback].

Regards
Ranju
From: Mich Talebzadeh 
Sent: Thursday, March 11, 2021 5:22 PM
To: Ranju Jain 
Cc: user@spark.apache.org
Subject: Re: Spark on Kubernetes | 3.0.1 | Shared Volume or NFS

Ok this is on Google Cloud correct?







LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw







Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Thu, 11 Mar 2021 at 11:29, Ranju Jain 
mailto:ranju.j...@ericsson.com.invalid>> wrote:
Hi,

I need to write all Executors pods data on some common location  which can be 
accessed and retrieved by driver pod.
I was first planning to go with NFS, but I think Shared Volume is equally good.
Please suggest Is there any major drawback in using Shared Volume instead of 
NFS when many pods are writing  on the same Volume [ReadWriteMany].

Regards
Ranju


spark on k8s driver pod exception

2021-03-11 Thread yxl040840219






when run the code in k8s ,  driver pod throw AnalysisException , but  the 
spark-submit log still  running , then how to get the exception and stop pods ?


val spark = SparkSession.builder().getOrCreate()
import spark.implicits._
val df = (0 until 10).toDF("id").selectExpr("id % 5 as key", "id%10 as 
value")
  .groupBy("key").agg(count("value1").as("cnt"))
df.show()
spark.stop()


bin/spark-submit \
--master k8s://https://localhost:9443 \
--deploy-mode cluster \
--name wordcount \
--class k8s.WordCount \
--conf spark.kubernetes.container.image=rspark:v3.1.1 \
--conf spark.kubernetes.container.image.pullPolicy=IfNotPresent \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.kubernetes.file.upload.path=hdfs://localhost:8020/data/spark \
/data/spark-example-1.0.0.jar

Re: Spark on Kubernetes | 3.0.1 | Shared Volume or NFS

2021-03-11 Thread Mich Talebzadeh
Well your mileage varies so to speak.

The only way to find out is setting an NFS mount and testing it.


The performance will depend on the mounted file system and the amount of
cache it has.


File cache is important for reads and if you are going to do random writes
(as opposed to sequential writes), then you can stripe the volume (RAID 1)
for better performance.


Do you have a UNIX admin who can help you out as well?


HTH


LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*





*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 11 Mar 2021 at 12:01, Ranju Jain  wrote:

> Hi Mich,
>
>
>
> No, it is not Google cloud. It is simply Kubernetes deployed over Bare
> Metal Platform.
>
> I am not clear for pros and cons of Shared Volume vs NFS for Read Write
> Many.
>
> As NFS is Network File Server [remote] , so I can figure out that Shared
> Volume should be more preferable, but don’t know the other sides [drawback].
>
>
>
> Regards
>
> Ranju
>
> *From:* Mich Talebzadeh 
> *Sent:* Thursday, March 11, 2021 5:22 PM
> *To:* Ranju Jain 
> *Cc:* user@spark.apache.org
> *Subject:* Re: Spark on Kubernetes | 3.0.1 | Shared Volume or NFS
>
>
>
> Ok this is on Google Cloud correct?
>
>
>
>
>
>
>
>
> LinkedIn  
> *https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
>
>
>
> On Thu, 11 Mar 2021 at 11:29, Ranju Jain 
> wrote:
>
> Hi,
>
>
>
> I need to write all Executors pods data on some common location  which can
> be accessed and retrieved by driver pod.
>
> I was first planning to go with NFS, but I think Shared Volume is equally
> good.
>
> Please suggest Is there any major drawback in using Shared Volume instead
> of NFS when many pods are writing  on the same Volume [ReadWriteMany].
>
>
>
> Regards
>
> Ranju
>
>


RE: Spark on Kubernetes | 3.0.1 | Shared Volume or NFS

2021-03-11 Thread Ranju Jain
Yes, there is a Team but I have not contacted them yet.
Trying to understand at my end.

I understood your point you mentioned below:

Do you have any reference or links where I can check out the Shared Volumes ?

Regards
Ranju

From: Mich Talebzadeh 
Sent: Thursday, March 11, 2021 5:38 PM
Cc: user@spark.apache.org
Subject: Re: Spark on Kubernetes | 3.0.1 | Shared Volume or NFS

Well your mileage varies so to speak.


The only way to find out is setting an NFS mount and testing it.



The performance will depend on the mounted file system and the amount of cache 
it has.



File cache is important for reads and if you are going to do random writes (as 
opposed to sequential writes), then you can stripe the volume (RAID 1) for 
better performance.



Do you have a UNIX admin who can help you out as well?



HTH



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw







Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Thu, 11 Mar 2021 at 12:01, Ranju Jain 
mailto:ranju.j...@ericsson.com>> wrote:
Hi Mich,

No, it is not Google cloud. It is simply Kubernetes deployed over Bare Metal 
Platform.
I am not clear for pros and cons of Shared Volume vs NFS for Read Write Many.
As NFS is Network File Server [remote] , so I can figure out that Shared Volume 
should be more preferable, but don’t know the other sides [drawback].

Regards
Ranju
From: Mich Talebzadeh 
mailto:mich.talebza...@gmail.com>>
Sent: Thursday, March 11, 2021 5:22 PM
To: Ranju Jain 
mailto:ranju.j...@ericsson.com.invalid>>
Cc: user@spark.apache.org
Subject: Re: Spark on Kubernetes | 3.0.1 | Shared Volume or NFS

Ok this is on Google Cloud correct?







LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw







Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Thu, 11 Mar 2021 at 11:29, Ranju Jain 
mailto:ranju.j...@ericsson.com.invalid>> wrote:
Hi,

I need to write all Executors pods data on some common location  which can be 
accessed and retrieved by driver pod.
I was first planning to go with NFS, but I think Shared Volume is equally good.
Please suggest Is there any major drawback in using Shared Volume instead of 
NFS when many pods are writing  on the same Volume [ReadWriteMany].

Regards
Ranju


Re: Spark on Kubernetes | 3.0.1 | Shared Volume or NFS

2021-03-11 Thread Mich Talebzadeh
I don't have any specific reference. However, you can do a Google search.

best to ask the Unix team. They can do all that themselves.

HTHT



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*





*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 11 Mar 2021 at 12:53, Ranju Jain  wrote:

> Yes, there is a Team but I have not contacted them yet.
>
> Trying to understand at my end.
>
>
>
> I understood your point you mentioned below:
>
>
>
> Do you have any reference or links where I can check out the Shared
> Volumes ?
>
>
>
> Regards
>
> Ranju
>
>
>
> *From:* Mich Talebzadeh 
> *Sent:* Thursday, March 11, 2021 5:38 PM
> *Cc:* user@spark.apache.org
> *Subject:* Re: Spark on Kubernetes | 3.0.1 | Shared Volume or NFS
>
>
>
> Well your mileage varies so to speak.
>
>
>
> The only way to find out is setting an NFS mount and testing it.
>
>
>
> The performance will depend on the mounted file system and the amount of
> cache it has.
>
>
>
> File cache is important for reads and if you are going to do random writes
> (as opposed to sequential writes), then you can stripe the volume (RAID 1)
> for better performance.
>
>
>
> Do you have a UNIX admin who can help you out as well?
>
>
>
> HTH
>
>
>
> LinkedIn  
> *https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
>
>
>
> On Thu, 11 Mar 2021 at 12:01, Ranju Jain  wrote:
>
> Hi Mich,
>
>
>
> No, it is not Google cloud. It is simply Kubernetes deployed over Bare
> Metal Platform.
>
> I am not clear for pros and cons of Shared Volume vs NFS for Read Write
> Many.
>
> As NFS is Network File Server [remote] , so I can figure out that Shared
> Volume should be more preferable, but don’t know the other sides [drawback].
>
>
>
> Regards
>
> Ranju
>
> *From:* Mich Talebzadeh 
> *Sent:* Thursday, March 11, 2021 5:22 PM
> *To:* Ranju Jain 
> *Cc:* user@spark.apache.org
> *Subject:* Re: Spark on Kubernetes | 3.0.1 | Shared Volume or NFS
>
>
>
> Ok this is on Google Cloud correct?
>
>
>
>
>
>
>
>
> LinkedIn  
> *https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
>
>
>
> On Thu, 11 Mar 2021 at 11:29, Ranju Jain 
> wrote:
>
> Hi,
>
>
>
> I need to write all Executors pods data on some common location  which can
> be accessed and retrieved by driver pod.
>
> I was first planning to go with NFS, but I think Shared Volume is equally
> good.
>
> Please suggest Is there any major drawback in using Shared Volume instead
> of NFS when many pods are writing  on the same Volume [ReadWriteMany].
>
>
>
> Regards
>
> Ranju
>
>


Re: spark on k8s driver pod exception

2021-03-11 Thread Attila Zsolt Piros
For getting the logs please read Accessing Logs

part
of the *Running Spark on Kubernetes* page.

For stopping and generic management of the spark application please read
the Spark Application Management
,
where you find the example:

$ spark-submit --kill spark:spark-pi* --master  k8s://https://192.168.2.8:8443



On Thu, Mar 11, 2021 at 1:07 PM yxl040840219  wrote:

>
>
>
> when run the code in k8s ,  driver pod throw AnalysisException , but  the
> spark-submit log still  running , then how to get the exception and stop
> pods ?
>
> val spark = SparkSession.builder().getOrCreate()
> import spark.implicits._
> val df = (0 until 10).toDF("id").selectExpr("id % 5 as key",
> "id%10 as value")
>   .groupBy("key").agg(count("value1").as("cnt"))
> df.show()
> spark.stop()
>
> bin/spark-submit \
> --master k8s://https://localhost:9443 \
> --deploy-mode cluster \
> --name wordcount \
> --class k8s.WordCount \
> --conf spark.kubernetes.container.image=rspark:v3.1.1 \
> --conf spark.kubernetes.container.image.pullPolicy=IfNotPresent \
> --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
> --conf spark.kubernetes.file.upload.path=hdfs://localhost:8020/data/spark \
> /data/spark-example-1.0.0.jar
>


Re: spark on k8s driver pod exception

2021-03-11 Thread Attila Zsolt Piros
> but  the spark-submit log still  running

Set the "spark.kubernetes.submission.waitAppCompletion" config to false to
change that. As the doc says:

"spark.kubernetes.submission.waitAppCompletion" : In cluster mode, whether
to wait for the application to finish before exiting the launcher process.
When changed to false, the launcher has a "fire-and-forget" behavior when
launching the Spark job.

On Thu, Mar 11, 2021 at 10:05 PM Attila Zsolt Piros <
piros.attila.zs...@gmail.com> wrote:

>
> For getting the logs please read Accessing Logs
> 
>  part
> of the *Running Spark on Kubernetes* page.
>
> For stopping and generic management of the spark application please read
> the Spark Application Management
> ,
> where you find the example:
>
> $ spark-submit --kill spark:spark-pi* --master  k8s://https://192.168.2.8:8443
>
>
>
> On Thu, Mar 11, 2021 at 1:07 PM yxl040840219  wrote:
>
>>
>>
>>
>> when run the code in k8s ,  driver pod throw AnalysisException , but  the
>> spark-submit log still  running , then how to get the exception and stop
>> pods ?
>>
>> val spark = SparkSession.builder().getOrCreate()
>> import spark.implicits._
>> val df = (0 until 10).toDF("id").selectExpr("id % 5 as key",
>> "id%10 as value")
>>   .groupBy("key").agg(count("value1").as("cnt"))
>> df.show()
>> spark.stop()
>>
>> bin/spark-submit \
>> --master k8s://https://localhost:9443 \
>> --deploy-mode cluster \
>> --name wordcount \
>> --class k8s.WordCount \
>> --conf spark.kubernetes.container.image=rspark:v3.1.1 \
>> --conf spark.kubernetes.container.image.pullPolicy=IfNotPresent \
>> --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
>> --conf spark.kubernetes.file.upload.path=hdfs://localhost:8020/data/spark
>> \
>> /data/spark-example-1.0.0.jar
>>
>


RE: Spark on Kubernetes | 3.0.1 | Shared Volume or NFS

2021-03-11 Thread Ranju Jain
Ok!

Thanks for all guidance :-)

Regards
Ranju

From: Mich Talebzadeh 
Sent: Thursday, March 11, 2021 11:07 PM
To: Ranju Jain 
Cc: user@spark.apache.org
Subject: Re: Spark on Kubernetes | 3.0.1 | Shared Volume or NFS

I don't have any specific reference. However, you can do a Google search.

best to ask the Unix team. They can do all that themselves.

HTHT





LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw







Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Thu, 11 Mar 2021 at 12:53, Ranju Jain 
mailto:ranju.j...@ericsson.com>> wrote:
Yes, there is a Team but I have not contacted them yet.
Trying to understand at my end.

I understood your point you mentioned below:

Do you have any reference or links where I can check out the Shared Volumes ?

Regards
Ranju

From: Mich Talebzadeh 
mailto:mich.talebza...@gmail.com>>
Sent: Thursday, March 11, 2021 5:38 PM
Cc: user@spark.apache.org
Subject: Re: Spark on Kubernetes | 3.0.1 | Shared Volume or NFS

Well your mileage varies so to speak.


The only way to find out is setting an NFS mount and testing it.



The performance will depend on the mounted file system and the amount of cache 
it has.



File cache is important for reads and if you are going to do random writes (as 
opposed to sequential writes), then you can stripe the volume (RAID 1) for 
better performance.



Do you have a UNIX admin who can help you out as well?



HTH



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw







Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Thu, 11 Mar 2021 at 12:01, Ranju Jain 
mailto:ranju.j...@ericsson.com>> wrote:
Hi Mich,

No, it is not Google cloud. It is simply Kubernetes deployed over Bare Metal 
Platform.
I am not clear for pros and cons of Shared Volume vs NFS for Read Write Many.
As NFS is Network File Server [remote] , so I can figure out that Shared Volume 
should be more preferable, but don’t know the other sides [drawback].

Regards
Ranju
From: Mich Talebzadeh 
mailto:mich.talebza...@gmail.com>>
Sent: Thursday, March 11, 2021 5:22 PM
To: Ranju Jain 
mailto:ranju.j...@ericsson.com.invalid>>
Cc: user@spark.apache.org
Subject: Re: Spark on Kubernetes | 3.0.1 | Shared Volume or NFS

Ok this is on Google Cloud correct?







LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw







Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




On Thu, 11 Mar 2021 at 11:29, Ranju Jain 
mailto:ranju.j...@ericsson.com.invalid>> wrote:
Hi,

I need to write all Executors pods data on some common location  which can be 
accessed and retrieved by driver pod.
I was first planning to go with NFS, but I think Shared Volume is equally good.
Please suggest Is there any major drawback in using Shared Volume instead of 
NFS when many pods are writing  on the same Volume [ReadWriteMany].

Regards
Ranju


Re: FlatMapGroupsWithStateFunction is called thrice - Production use case.

2021-03-11 Thread Jungtaek Lim
Hi,

Could you please provide the Spark version?

Also it would be pretty much helpful if you could provide a simple
reproducer, like placing your reproducer which can simply be built (mvn or
gradle or sbt) into your Github repository, plus the set of input data to
see the behavior. Worth to know that others aren't interested in your own
code even if they are interested in the problematic behavior itself. It'd
be nice if you can minimize the hurdle on debugging.

Thanks,
Jungtaek Lim (HeartSaVioR)

On Thu, Mar 11, 2021 at 4:54 PM Kuttaiah Robin  wrote:

> Hello,
>
> I have a use case where I need to read events(non correlated) from a
> source kafka topic, then correlate and push forward to another target topic.
>
> I use spark structured streaming with FlatMapGroupsWithStateFunction along
> with   GroupStateTimeout.ProcessingTimeTimeout() . After each timeout, I
> apply some correlation logic on group events and push forward correlated
> events to another topic(via ForEachBatch). Non correlated events are stored
> in the state until they are correlated in a future set of events.
>
> With this scenario, when I push a single event to source topic, I see it
> comes three times to FlatMapGroupsWithStateFunction(In separate timestamp)
> but only once in ForEachBatch processor(which is good).
>
> Same event coming thrice in FlatMapGroupsWithStateFunction is a problem as
> it causes issues with my correlation logic.
>
> Can someone help me to understand why this is seen thrice
> in FlatMapGroupsWithStateFunction?.
>
> Code snippets are shown below. Please let me know what is missing and how
> can i solve this,
>
> thanks,
> Robin Kuttaiah
>
> *StreamQuery*
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *Dataset sessionUpdates = null;
> FlatMapGroupsWithStateFunction MilestoneEvent> idstateUpdateFunction = new
> FlatMapIdFedGroupFunction(m_InsightEvent, m_InsightDeployment);try {
>   sessionUpdates = idFedKafkaEvents  .groupByKey(  new
> MapFunction() {private static final long
> serialVersionUID = -797571731893988577L;@Override public
> String call(Row event) {  return
> event.getAs("EVENT_MODEL_ID_COL");}  },
> Encoders.STRING())
> .flatMapGroupsWithState(idstateUpdateFunction, OutputMode.Append(),
>   Encoders.bean(IdentifierConnector.class),
> Encoders.bean(MilestoneEvent.class),
> GroupStateTimeout.ProcessingTimeTimeout());} catch (Exception
> oException) {  //log and throw back exception*
>
>
>
>
>
>
>
>
> *}ForeachBatchProcessor oForeachBatch = new
> ForeachBatchProcessor(m_InsightDeployment, m_InsightEvent,
> m_strQueryName);DataStreamWriter events =
> sessionUpdates.writeStream().queryName(queryName)
> .outputMode("append").trigger(Trigger.ProcessingTime("*5 seconds"
> *))*
>
>
> *.option("checkpointLocation", checkpointLocation)
> .foreachBatch(oForeachBatch);*
>
>
> *FlatMapGroupsWithStateFunction:*
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *public class FlatMapIdFedGroupFunction implements
> FlatMapGroupsWithStateFunction MilestoneEvent> {  public FlatMapIdFedGroupFunction(InsightEvent iEvent,
> InsightDeployment iDeployment) {  }  @Override  public
> Iterator call(String key, Iterator events,
> GroupState state)  throws Exception {
> List outputEvents = new ArrayList();
> IdentifierConnector session = null;
> IdFederationUtil.write("FlatMapIdFedGroupFunction invoked for " + key+"
>  "+System.currentTimeMillis()); //Called thriceif (!state.exists() ) {
> session = new IdentifierConnector();}  else {  session =
> state.get();}while (events.hasNext()) {  Row event =
> events.next();  MilestoneEvent mEventCurr =
> IdFederationUtil.getMilestoneEvent(event, insightEvent);
> outputEvents.add(mEventCurr);
> IdFederationUtil.write(".."+mEventCurr.getMilestoneId()); //Called
> thrice  break;}return outputEvents.iterator();  }*
>
> *}*
>
>
> *ForEachBatchFunction:*
>
> public class ForeachBatchProcessor implements
> VoidFunction2, Long>, Serializable {
>
>   private static final long serialVersionUID = 1L;
>
>   public ForeachBatchProcessor(InsightDeployment in_oInsightDeployment,
>   InsightEvent in_oInsightEvent, String in_strQueryName) {
> \  }
>
>   public void call(Dataset in_Rows, Long in_lBatchID)
>   throws Exception {
> if (in_Rows.count() == 0L) {
>   return;
> }
> IdFederationUtil.write("Processing batch " + in_lBatchID + "  "+
> in_Rows.count());
> List events = in_Rows.collectAsList();
> for(MilestoneEvent m: events) {
>   IdFederationUtil.write("..BATCH "+m.getMilestoneId());
> }
>   }
>
> }
>
>
>


Re: Single executor processing all tasks in spark structured streaming kafka

2021-03-11 Thread Sachit Murarka
Hi Kapil,

Thanks for suggestion. Yes, It worked.

Regards
Sachit

On Tue, 9 Mar 2021, 00:19 Kapil Garg,  wrote:

> Hi Sachit,
> What do you mean by "spark is running only 1 executor with 1 task" ?
> Did you submit the spark application with multiple executors but only 1 is
> being used and rest are idle ?
> If that's the case, then it might happen due to spark.locality.wait
> setting which is by default set to 3s. This will enable spark to wait for
> 3s for the tasks to finish on the executor before submitting the next batch
> on another executors. This happens due to spark's preference for cached
> kafka consumers.
>
> And regarding having 1 task doing all the processing. Please check if your
> kafka topic has only 1 partition. Spark draws the parallelism from the
> number of partitions in the kafka topic. Once you have loaded the data from
> partitions, you can choose to repartition the batch so it is processed by
> multiple tasks.
>
> On Mon, Mar 8, 2021 at 10:57 PM Sachit Murarka 
> wrote:
>
>> Hi All,
>>
>> I am using Spark 3.0.1 Structuring streaming with Pyspark.
>>
>> The problem is spark is running only 1 executor with 1 task. Following is
>> the summary of what I am doing.
>>
>> Can anyone help on why my executor is 1 only?
>>
>> def process_events(event):
>> fetch_actual_data()
>> #many more steps
>>
>> def fetch_actual_data():
>> #applying operation on actual data
>>
>> df = spark.readStream.format("kafka") \
>> .option("kafka.bootstrap.servers", KAFKA_URL) \
>> .option("subscribe", KAFKA_TOPICS) \
>> .option("startingOffsets",
>> START_OFFSET).load() .selectExpr("CAST(value AS STRING)")
>>
>>
>> query =
>> df.writeStream.foreach(process_events).option("checkpointLocation",
>> "/opt/checkpoint").trigger(processingTime="30 seconds").start()
>>
>>
>>
>> Kind Regards,
>> Sachit Murarka
>>
>
>
> --
> Regards
> Kapil Garg
>
>
> *-*
>
> *This email and any files transmitted with it are confidential and
> intended solely for the use of the individual or entity to whom they are
> addressed. If you have received this email in error, please notify the
> system manager. This message contains confidential information and is
> intended only for the individual named. If you are not the named addressee,
> you should not disseminate, distribute or copy this email. Please notify
> the sender immediately by email if you have received this email by mistake
> and delete this email from your system. If you are not the intended
> recipient, you are notified that disclosing, copying, distributing or
> taking any action in reliance on the contents of this information is
> strictly prohibited.*
>
>
>
> *Any views or opinions presented in this email are solely those of the
> author and do not necessarily represent those of the organization. Any
> information on shares, debentures or similar instruments, recommended
> product pricing, valuations and the like are for information purposes only.
> It is not meant to be an instruction or recommendation, as the case may be,
> to buy or to sell securities, products, services nor an offer to buy or
> sell securities, products or services unless specifically stated to be so
> on behalf of the Flipkart group. Employees of the Flipkart group of
> companies are expressly required not to make defamatory statements and not
> to infringe or authorise any infringement of copyright or any other legal
> right by email communications. Any such communication is contrary to
> organizational policy and outside the scope of the employment of the
> individual concerned. The organization will not accept any liability in
> respect of such communication, and the employee responsible will be
> personally liable for any damages or other liability arising.*
>
>
>
> *Our organization accepts no liability for the content of this email, or
> for the consequences of any actions taken on the basis of the information *
> provided,* unless that information is subsequently confirmed in writing.
> If you are not the intended recipient, you are notified that disclosing,
> copying, distributing or taking any action in reliance on the contents of
> this information is strictly prohibited.*
>
>
> *-*
>
>


Re: Detecting latecomer events in Spark structured streaming

2021-03-11 Thread Jungtaek Lim
Hi,

If I remember correctly, I don't think Spark provides watermark value
itself for the current batch to the public API. That said, if you're
dealing with "event time" (and I guess you belong to this case as you worry
about late events), unless you employ a new logical/physical plan to expose
watermarks to the user level function, it's not possible to do what you
plan to do.

I've tried similar thing to count the number of late events via making
changes on Spark codebase (see https://github.com/apache/spark/pull/24936)
- my initial goal was providing side-output on late events to let end users
being able to deal with these events outside of the query, but soon
realized it's non-trivial, and just took the simplest approach at that time.
(There're still possible ideas to do, e.g. sending them to the driver via
RPC, assuming these events are "minority", but nothing comes into
conclusion it worths to put efforts. If your business logic requires it,
you could be a hacker and try to deal with this, and share if you succeed
to make it.)

I'd skip answering questions as I explained you'd be stuck even before
raising these questions.

Hope this helps.

Thanks,
Jungtaek Lim (HeartSaVioR)


On Tue, Mar 9, 2021 at 6:49 AM Sergey Oboguev  wrote:

> I have a Spark structured streaming based application that performs
> window(...) construct followed by aggregation.
>
> This construct discards latecomer events that arrive past the watermark. I
> need to be able to detect these late events to handle them out-of-band.
> The application maintains a raw store of all received events and can
> re-aggregate a particular time interval for a particular device in a
> secondary batch mode, as long as it knows that this interval has to be
> re-aggregated, i.e. contains latecomer data that was discarded by
> structured streaming due to watermark.
>
> I am trying to come with a way to perform such a detection.
>
> One approach would perhaps be to insert an additional stage before
> window(...) -- a stage that would monitor all events received by the
> stream, look at their timestamps, and predict which events will be
> discarded by window(...) due to watermark. Such events can then be handled
> outside of Spark structured streaming. The stage can be based on
> Dataset.foreach, Dataset.filter or Dataset.map that will pass all events
> through, but also monitor the events and if a latecomer condition is
> detected, then issue a side channel notification that will cause data for
> the specified device and interval be re-aggregated later from raw event
> storage, out of stream.
>
> I have a couple of questions related to the feasibility of such a
> construct.
>
> Q1:
>
> Can data behind the window(...) be shared by multiple executors or nodes,
> or is it owned by one executor at any given time? If it is shared, it would
> appear that local monitoring of passing timestamps would be insufficient,
> since it lacks global context.
>
> Q2:
>
> To monitor the stream, the stage needs to maintain a context. The context
> can be checkpointed periodically in an external store, but I do not want to
> persist/readback the context for every microbatch (or, in the foreach case,
> for every individual event). I want to checkpoint the context infrequently,
> and maintain it across microbatches just in memory.
>
> Which brings a question... The handler function inside the stage (called
> by foreach, map, or filter) needs to refer to the context object, yet it is
> unclear how to make such a reference.
>
> I could attach a context to the stream via some global map object
> (translating stream->context), but handler functions for Dataset.foreach,
> Dataset.map, or Dataset.filter do not receive a stream handle, and thus
> have no key to use for translation back to context object.
>
> The association cannot be done via a TLS (per-thread) variable too, since
> Spark can internally create threads for stream processing and they won't
> inherit the parent TLS (and also may not even have the thread that started
> the stream as their parent thread).
>
> This appears to leave Java static variable as the only option for the
> context pointer, limiting the model to one active stream per executor. But
> is it guaranteed by Spark specification that different executors will run
> in different JVM instances?
>
> Thanks for advice.
>