Re: [Spark on Kubernetes]: Seeking Guidance on Handling Persistent Executor Failures

2024-02-19 Thread Mich Talebzadeh
 {str(e)}") # Increment the retry count
>> retries += 1 # Sleep time.sleep(60) else: # Break out of the loop if the
>> job completes successfully break
>>
>> HTH
>>
>> Mich Talebzadeh,
>> Dad | Technologist | Solutions Architect | Engineer
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* The information provided is correct to the best of my
>> knowledge but of course cannot be guaranteed . It is essential to note
>> that, as with any advice, quote "one test result is worth one-thousand
>> expert opinions (Werner
>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun
>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>>
>>
>> On Mon, 19 Feb 2024 at 19:21, Mich Talebzadeh 
>> wrote:
>>
>>> Not that I am aware of any configuration parameter in Spark classic to
>>> limit executor creation. Because of fault tolerance Spark will try to
>>> recreate failed executors. Not really that familiar with the Spark operator
>>> for k8s. There may be something there.
>>>
>>> Have you considered custom monitoring and handling within Spark itself
>>> using max_retries = 5  etc?
>>>
>>> HTH
>>>
>>> HTH
>>>
>>> Mich Talebzadeh,
>>> Dad | Technologist | Solutions Architect | Engineer
>>> London
>>> United Kingdom
>>>
>>>
>>>view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* The information provided is correct to the best of my
>>> knowledge but of course cannot be guaranteed . It is essential to note
>>> that, as with any advice, quote "one test result is worth one-thousand
>>> expert opinions (Werner
>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun
>>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".
>>>
>>>
>>> On Mon, 19 Feb 2024 at 18:34, Sri Potluri  wrote:
>>>
>>>> Hello Spark Community,
>>>>
>>>> I am currently leveraging Spark on Kubernetes, managed by the Spark
>>>> Operator, for running various Spark applications. While the system
>>>> generally works well, I've encountered a challenge related to how Spark
>>>> applications handle executor failures, specifically in scenarios where
>>>> executors enter an error state due to persistent issues.
>>>>
>>>> *Problem Description*
>>>>
>>>> When an executor of a Spark application fails, the system attempts to
>>>> maintain the desired level of parallelism by automatically recreating a new
>>>> executor to replace the failed one. While this behavior is beneficial for
>>>> transient errors, ensuring that the application continues to run, it
>>>> becomes problematic in cases where the failure is due to a persistent issue
>>>> (such as misconfiguration, inaccessible external resources, or incompatible
>>>> environment settings). In such scenarios, the application enters a loop,
>>>> continuously trying to recreate executors, which leads to resource wastage
>>>> and complicates application management.
>>>>
>>>> *Desired Behavior*
>>>>
>>>> Ideally, I would like to have a mechanism to limit the number of
>>>> retries for executor recreation. If the system fails to successfully create
>>>> an executor more than a specified number of times (e.g., 5 attempts), the
>>>> entire Spark application should fail and stop trying to recreate the
>>>> executor. This behavior would help in efficiently managing resources and
>>>> avoiding prolonged failure states.
>>>>
>>>> *Questions for the Community*
>>>>
>>>> 1. Is there an existing configuration or method within Spark or the
>>>> Spark Operator to limit executor recreation attempts and fail the job after
>>>> reaching a threshold?
>>>>
>>>> 2. Has anyone else encountered similar challenges and found workarounds
>>>> or solutions that could be applied in this context?
>>>>
>>>>
>>>> *Additional Context*
>>>>
>>>> I have explored Spark's task and stage retry configurations
>>>> (`spark.task.maxFailures`, `spark.stage.maxConsecutiveAttempts`), but these
>>>> do not directly address the issue of limiting executor creation retries.
>>>> Implementing a custom monitoring solution to track executor failures and
>>>> manually stop the application is a potential workaround, but it would be
>>>> preferable to have a more integrated solution.
>>>>
>>>> I appreciate any guidance, insights, or feedback you can provide on
>>>> this matter.
>>>>
>>>> Thank you for your time and support.
>>>>
>>>> Best regards,
>>>> Sri P
>>>>
>>>
>
> --
> Thank you,
> Sindhu P.
>


Re: [Spark on Kubernetes]: Seeking Guidance on Handling Persistent Executor Failures

2024-02-19 Thread Cheng Pan
Spark has supported the window-based executor failure-tracking mechanism for 
YARN for a long time, SPARK-41210[1][2] (included in 3.5.0) extended this 
feature to K8s.

[1] https://issues.apache.org/jira/browse/SPARK-41210
[2] https://github.com/apache/spark/pull/38732

Thanks,
Cheng Pan


> On Feb 19, 2024, at 23:59, Sri Potluri  wrote:
> 
> Hello Spark Community,
> 
> I am currently leveraging Spark on Kubernetes, managed by the Spark Operator, 
> for running various Spark applications. While the system generally works 
> well, I've encountered a challenge related to how Spark applications handle 
> executor failures, specifically in scenarios where executors enter an error 
> state due to persistent issues.
> 
> Problem Description
> 
> When an executor of a Spark application fails, the system attempts to 
> maintain the desired level of parallelism by automatically recreating a new 
> executor to replace the failed one. While this behavior is beneficial for 
> transient errors, ensuring that the application continues to run, it becomes 
> problematic in cases where the failure is due to a persistent issue (such as 
> misconfiguration, inaccessible external resources, or incompatible 
> environment settings). In such scenarios, the application enters a loop, 
> continuously trying to recreate executors, which leads to resource wastage 
> and complicates application management.
> 
> Desired Behavior
> 
> Ideally, I would like to have a mechanism to limit the number of retries for 
> executor recreation. If the system fails to successfully create an executor 
> more than a specified number of times (e.g., 5 attempts), the entire Spark 
> application should fail and stop trying to recreate the executor. This 
> behavior would help in efficiently managing resources and avoiding prolonged 
> failure states.
> 
> Questions for the Community
> 
> 1. Is there an existing configuration or method within Spark or the Spark 
> Operator to limit executor recreation attempts and fail the job after 
> reaching a threshold?
>
> 2. Has anyone else encountered similar challenges and found workarounds or 
> solutions that could be applied in this context?
> 
> 
> Additional Context
> 
> I have explored Spark's task and stage retry configurations 
> (`spark.task.maxFailures`, `spark.stage.maxConsecutiveAttempts`), but these 
> do not directly address the issue of limiting executor creation retries. 
> Implementing a custom monitoring solution to track executor failures and 
> manually stop the application is a potential workaround, but it would be 
> preferable to have a more integrated solution.
> 
> I appreciate any guidance, insights, or feedback you can provide on this 
> matter.
> 
> Thank you for your time and support.
> 
> Best regards,
> Sri P


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Spark on Kubernetes]: Seeking Guidance on Handling Persistent Executor Failures

2024-02-19 Thread Sri Potluri
 Spark applications. While the system
>>> generally works well, I've encountered a challenge related to how Spark
>>> applications handle executor failures, specifically in scenarios where
>>> executors enter an error state due to persistent issues.
>>>
>>> *Problem Description*
>>>
>>> When an executor of a Spark application fails, the system attempts to
>>> maintain the desired level of parallelism by automatically recreating a new
>>> executor to replace the failed one. While this behavior is beneficial for
>>> transient errors, ensuring that the application continues to run, it
>>> becomes problematic in cases where the failure is due to a persistent issue
>>> (such as misconfiguration, inaccessible external resources, or incompatible
>>> environment settings). In such scenarios, the application enters a loop,
>>> continuously trying to recreate executors, which leads to resource wastage
>>> and complicates application management.
>>>
>>> *Desired Behavior*
>>>
>>> Ideally, I would like to have a mechanism to limit the number of retries
>>> for executor recreation. If the system fails to successfully create an
>>> executor more than a specified number of times (e.g., 5 attempts), the
>>> entire Spark application should fail and stop trying to recreate the
>>> executor. This behavior would help in efficiently managing resources and
>>> avoiding prolonged failure states.
>>>
>>> *Questions for the Community*
>>>
>>> 1. Is there an existing configuration or method within Spark or the
>>> Spark Operator to limit executor recreation attempts and fail the job after
>>> reaching a threshold?
>>>
>>> 2. Has anyone else encountered similar challenges and found workarounds
>>> or solutions that could be applied in this context?
>>>
>>>
>>> *Additional Context*
>>>
>>> I have explored Spark's task and stage retry configurations
>>> (`spark.task.maxFailures`, `spark.stage.maxConsecutiveAttempts`), but these
>>> do not directly address the issue of limiting executor creation retries.
>>> Implementing a custom monitoring solution to track executor failures and
>>> manually stop the application is a potential workaround, but it would be
>>> preferable to have a more integrated solution.
>>>
>>> I appreciate any guidance, insights, or feedback you can provide on this
>>> matter.
>>>
>>> Thank you for your time and support.
>>>
>>> Best regards,
>>> Sri P
>>>
>>

-- 
Thank you,
Sindhu P.


Re: [Spark on Kubernetes]: Seeking Guidance on Handling Persistent Executor Failures

2024-02-19 Thread Mich Talebzadeh
Is there an existing configuration or method within Spark or the Spark
>> Operator to limit executor recreation attempts and fail the job after
>> reaching a threshold?
>>
>> 2. Has anyone else encountered similar challenges and found workarounds
>> or solutions that could be applied in this context?
>>
>>
>> *Additional Context*
>>
>> I have explored Spark's task and stage retry configurations
>> (`spark.task.maxFailures`, `spark.stage.maxConsecutiveAttempts`), but these
>> do not directly address the issue of limiting executor creation retries.
>> Implementing a custom monitoring solution to track executor failures and
>> manually stop the application is a potential workaround, but it would be
>> preferable to have a more integrated solution.
>>
>> I appreciate any guidance, insights, or feedback you can provide on this
>> matter.
>>
>> Thank you for your time and support.
>>
>> Best regards,
>> Sri P
>>
>


Re: [Spark on Kubernetes]: Seeking Guidance on Handling Persistent Executor Failures

2024-02-19 Thread Mich Talebzadeh
Not that I am aware of any configuration parameter in Spark classic to
limit executor creation. Because of fault tolerance Spark will try to
recreate failed executors. Not really that familiar with the Spark operator
for k8s. There may be something there.

Have you considered custom monitoring and handling within Spark itself
using max_retries = 5  etc?

HTH

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von
Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)".


On Mon, 19 Feb 2024 at 18:34, Sri Potluri  wrote:

> Hello Spark Community,
>
> I am currently leveraging Spark on Kubernetes, managed by the Spark
> Operator, for running various Spark applications. While the system
> generally works well, I've encountered a challenge related to how Spark
> applications handle executor failures, specifically in scenarios where
> executors enter an error state due to persistent issues.
>
> *Problem Description*
>
> When an executor of a Spark application fails, the system attempts to
> maintain the desired level of parallelism by automatically recreating a new
> executor to replace the failed one. While this behavior is beneficial for
> transient errors, ensuring that the application continues to run, it
> becomes problematic in cases where the failure is due to a persistent issue
> (such as misconfiguration, inaccessible external resources, or incompatible
> environment settings). In such scenarios, the application enters a loop,
> continuously trying to recreate executors, which leads to resource wastage
> and complicates application management.
>
> *Desired Behavior*
>
> Ideally, I would like to have a mechanism to limit the number of retries
> for executor recreation. If the system fails to successfully create an
> executor more than a specified number of times (e.g., 5 attempts), the
> entire Spark application should fail and stop trying to recreate the
> executor. This behavior would help in efficiently managing resources and
> avoiding prolonged failure states.
>
> *Questions for the Community*
>
> 1. Is there an existing configuration or method within Spark or the Spark
> Operator to limit executor recreation attempts and fail the job after
> reaching a threshold?
>
> 2. Has anyone else encountered similar challenges and found workarounds or
> solutions that could be applied in this context?
>
>
> *Additional Context*
>
> I have explored Spark's task and stage retry configurations
> (`spark.task.maxFailures`, `spark.stage.maxConsecutiveAttempts`), but these
> do not directly address the issue of limiting executor creation retries.
> Implementing a custom monitoring solution to track executor failures and
> manually stop the application is a potential workaround, but it would be
> preferable to have a more integrated solution.
>
> I appreciate any guidance, insights, or feedback you can provide on this
> matter.
>
> Thank you for your time and support.
>
> Best regards,
> Sri P
>


[Spark on Kubernetes]: Seeking Guidance on Handling Persistent Executor Failures

2024-02-19 Thread Sri Potluri
Hello Spark Community,

I am currently leveraging Spark on Kubernetes, managed by the Spark
Operator, for running various Spark applications. While the system
generally works well, I've encountered a challenge related to how Spark
applications handle executor failures, specifically in scenarios where
executors enter an error state due to persistent issues.

*Problem Description*

When an executor of a Spark application fails, the system attempts to
maintain the desired level of parallelism by automatically recreating a new
executor to replace the failed one. While this behavior is beneficial for
transient errors, ensuring that the application continues to run, it
becomes problematic in cases where the failure is due to a persistent issue
(such as misconfiguration, inaccessible external resources, or incompatible
environment settings). In such scenarios, the application enters a loop,
continuously trying to recreate executors, which leads to resource wastage
and complicates application management.

*Desired Behavior*

Ideally, I would like to have a mechanism to limit the number of retries
for executor recreation. If the system fails to successfully create an
executor more than a specified number of times (e.g., 5 attempts), the
entire Spark application should fail and stop trying to recreate the
executor. This behavior would help in efficiently managing resources and
avoiding prolonged failure states.

*Questions for the Community*

1. Is there an existing configuration or method within Spark or the Spark
Operator to limit executor recreation attempts and fail the job after
reaching a threshold?

2. Has anyone else encountered similar challenges and found workarounds or
solutions that could be applied in this context?


*Additional Context*

I have explored Spark's task and stage retry configurations
(`spark.task.maxFailures`, `spark.stage.maxConsecutiveAttempts`), but these
do not directly address the issue of limiting executor creation retries.
Implementing a custom monitoring solution to track executor failures and
manually stop the application is a potential workaround, but it would be
preferable to have a more integrated solution.

I appreciate any guidance, insights, or feedback you can provide on this
matter.

Thank you for your time and support.

Best regards,
Sri P


Re: Seeking Guidance on Spark on Kubernetes Secrets Configuration

2023-10-01 Thread Jon Rodríguez Aranguren
Dear Jörn Franke, Jayabindu Singh and Spark Community members,

Thank you profoundly for your initial insights. I feel it's necessary to
provide more precision on our setup to facilitate a deeper understanding.

We're interfacing with S3 Compatible storages, but our operational context
is somewhat distinct. Our infrastructure doesn't lean on conventional cloud
providers like AWS. Instead, we've architected our environment on
On-Premise Kubernetes distributions, specifically k0s and Openshift.

Our objective extends beyond just handling S3 keys. We're orchestrating a
solution that integrates Azure SPNs, API Credentials, and other sensitive
credentials, intending to make Kubernetes' native secrets our central
management hub. The aspiration is to have a universally deployable JAR, one
that can function unmodified across different ecosystems like EMR,
Databricks (on both AWS and Azure), etc. Platforms like Databricks have
already made strides in this direction, allowing secrets to be woven
directly into the Spark Conf through mechanisms like
{{secret_scope/secret_name}}, which are resolved dynamically.

The spark-on-k8s-operator's user guide suggests the feasibility of mounting
secrets. However, a gap exists in our understanding of how to subsequently
access these mounted secret values within the Spark application's context.

Here lies my inquiry: is the spark-on-k8s-operator currently equipped to
support this level of integration? If it does, any elucidation on the
method or best practices would be pivotal for our project. Alternatively,
if you could point me to resources or community experts who have tackled
similar challenges, it would be of immense assistance.

Thank you for bearing with the intricacies of our query, and I appreciate
your continued guidance in this endeavor.

Warm regards,

Jon Rodríguez Aranguren.

El sáb, 30 sept 2023 a las 23:19, Jayabindu Singh ()
escribió:

> Hi Jon,
>
> Using IAM as suggested by Jorn is the best approach.
> We recently moved our spark workload from HDP to Spark on K8 and utilizing
> IAM.
> It will save you from secret management headaches and also allows a lot
> more flexibility on access control and option to allow access to multiple
> S3 buckets in the same pod.
> We have implemented this across Azure, Google and AWS. Azure does require
> some extra work to make it work.
>
> On Sat, Sep 30, 2023 at 12:05 PM Jörn Franke  wrote:
>
>> Don’t use static iam (s3) credentials. It is an outdated insecure method
>> - even AWS recommend against using this for anything (cf eg
>> https://docs.aws.amazon.com/cli/latest/userguide/cli-authentication-user.html
>> ).
>> It is almost a guarantee to get your data stolen and your account
>> manipulated.
>>
>> If you need to use kubernetes (which has its own very problematic
>> security issues) then assign AWS IAM roles with minimal permissions to the
>> pods (for EKS it means using OIDC, cf
>> https://docs.aws.amazon.com/eks/latest/userguide/service_IAM_role.html).
>>
>> Am 30.09.2023 um 03:41 schrieb Jon Rodríguez Aranguren <
>> jon.r.arangu...@gmail.com>:
>>
>> 
>> Dear Spark Community Members,
>>
>> I trust this message finds you all in good health and spirits.
>>
>> I'm reaching out to the collective expertise of this esteemed community
>> with a query regarding Spark on Kubernetes. As a newcomer, I have always
>> admired the depth and breadth of knowledge shared within this forum, and it
>> is my hope that some of you might have insights on a specific challenge I'm
>> facing.
>>
>> I am currently trying to configure multiple Kubernetes secrets, notably
>> multiple S3 keys, at the SparkConf level for a Spark application. My
>> objective is to understand the best approach or methods to ensure that
>> these secrets can be smoothly accessed by the Spark application.
>>
>> If any of you have previously encountered this scenario or possess
>> relevant insights on the matter, your guidance would be highly beneficial.
>>
>> Thank you for your time and consideration. I'm eager to learn from the
>> experiences and knowledge present within this community.
>>
>> Warm regards,
>> Jon
>>
>>


Re: Seeking Guidance on Spark on Kubernetes Secrets Configuration

2023-10-01 Thread Jörn Franke
There is nowadays more a trend to move away from static credentials/certificates that are stored in a secret vault. The issue is that the rotation of them is complex, once they are leaked they can be abused, making minimal permissions feasible is cumbersome etc. That is why keyless approaches are used for A2A access (workload identity federation was mentioned). E.g. in AWS EKS you would build this on oidc (https://docs.aws.amazon.com/eks/latest/userguide/enable-iam-roles-for-service-accounts.html) and configure this instead of using secrets. Similar approaches exist in other clouds and even on-premise (eg SPIFFE https://spiffe.io/).If this will become the standard will be difficult to say - for sure they seem to more easier to manage.Since you seem to have a Kubernetes setup which means per cloud/data Centre a lot of extra work, infrastructure cost and security issues, workload Identity federation may ease this compared to a secret store.Am 01.10.2023 um 08:27 schrieb Jon Rodríguez Aranguren :Dear Jörn Franke, Jayabindu Singh and Spark Community members,Thank you profoundly for your initial insights. I feel it's necessary to provide more precision on our setup to facilitate a deeper understanding.We're interfacing with S3 Compatible storages, but our operational context is somewhat distinct. Our infrastructure doesn't lean on conventional cloud providers like AWS. Instead, we've architected our environment on On-Premise Kubernetes distributions, specifically k0s and Openshift.Our objective extends beyond just handling S3 keys. We're orchestrating a solution that integrates Azure SPNs, API Credentials, and other sensitive credentials, intending to make Kubernetes' native secrets our central management hub. The aspiration is to have a universally deployable JAR, one that can function unmodified across different ecosystems like EMR, Databricks (on both AWS and Azure), etc. Platforms like Databricks have already made strides in this direction, allowing secrets to be woven directly into the Spark Conf through mechanisms like {{secret_scope/secret_name}}, which are resolved dynamically.The spark-on-k8s-operator's user guide suggests the feasibility of mounting secrets. However, a gap exists in our understanding of how to subsequently access these mounted secret values within the Spark application's context.Here lies my inquiry: is the spark-on-k8s-operator currently equipped to support this level of integration? If it does, any elucidation on the method or best practices would be pivotal for our project. Alternatively, if you could point me to resources or community experts who have tackled similar challenges, it would be of immense assistance.Thank you for bearing with the intricacies of our query, and I appreciate your continued guidance in this endeavor.Warm regards,Jon Rodríguez Aranguren.El sáb, 30 sept 2023 a las 23:19, Jayabindu Singh (<jayabi...@gmail.com>) escribió:Hi Jon,Using IAM as suggested by Jorn is the best approach.We recently moved our spark workload from HDP to Spark on K8 and utilizing IAM.It will save you from secret management headaches and also allows a lot more flexibility on access control and option to allow access to multiple S3 buckets in the same pod. We have implemented this across Azure, Google and AWS. Azure does require some extra work to make it work.On Sat, Sep 30, 2023 at 12:05 PM Jörn Franke <jornfra...@gmail.com> wrote:Don’t use static iam (s3) credentials. It is an outdated insecure method - even AWS recommend against using this for anything (cf eg https://docs.aws.amazon.com/cli/latest/userguide/cli-authentication-user.html).It is almost a guarantee to get your data stolen and your account manipulated. If you need to use kubernetes (which has its own very problematic security issues) then assign AWS IAM roles with minimal permissions to the pods (for EKS it means using OIDC, cf https://docs.aws.amazon.com/eks/latest/userguide/service_IAM_role.html).Am 30.09.2023 um 03:41 schrieb Jon Rodríguez Aranguren <jon.r.arangu...@gmail.com>:Dear Spark Community Members,I trust this message finds you all in good health and spirits.I'm reaching out to the collective expertise of this esteemed community with a query regarding Spark on Kubernetes. As a newcomer, I have always admired the depth and breadth of knowledge shared within this forum, and it is my hope that some of you might have insights on a specific challenge I'm facing.I am currently trying to configure multiple Kubernetes secrets, notably multiple S3 keys, at the SparkConf level for a Spark application. My objective is to understand the best approach or methods to ensure that these secrets can be smoothly accessed by the Spark application.If any of you have previously encountered this scenario or possess relevant insights on the matter, your guidance would be highly beneficial.Thank you for your time and consideration. I'm eager to learn from the experiences and knowledge present within this community.Warm regards,Jon




Re: Seeking Guidance on Spark on Kubernetes Secrets Configuration

2023-10-01 Thread Jörn Franke
With oidc sth comparable is possible: https://docs.aws.amazon.com/eks/latest/userguide/enable-iam-roles-for-service-accounts.htmlAm 01.10.2023 um 11:13 schrieb Mich Talebzadeh :It seems that workload identity is not available on AWS. Workload Identity replaces the need to use Metadata concealment on exposed storage such as s3 and gcs. The sensitive metadata protected by metadata concealment is also protected by Workload Identity.Both Google Cloud Kubernetes (GKE) and Azure Kubernetes Service support Workload Identity. Taking notes from Google Cloud:  "Workload Identity is the recommended way for your workloads running on Google Kubernetes Engine (GKE) to access Google Cloud services in a secure and manageable way."HTH

Mich Talebzadeh,Distinguished Technologist, Solutions Architect & EngineerLondonUnited Kingdom

   view my Linkedin profile https://en.everybodywiki.com/Mich_Talebzadeh

 Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.  

On Sun, 1 Oct 2023 at 06:36, Jayabindu Singh <jayabi...@gmail.com> wrote:Hi Jon,Using IAM as suggested by Jorn is the best approach.We recently moved our spark workload from HDP to Spark on K8 and utilizing IAM.It will save you from secret management headaches and also allows a lot more flexibility on access control and option to allow access to multiple S3 buckets in the same pod. We have implemented this across Azure, Google and AWS. Azure does require some extra work to make it work.On Sat, Sep 30, 2023 at 12:05 PM Jörn Franke <jornfra...@gmail.com> wrote:Don’t use static iam (s3) credentials. It is an outdated insecure method - even AWS recommend against using this for anything (cf eg https://docs.aws.amazon.com/cli/latest/userguide/cli-authentication-user.html).It is almost a guarantee to get your data stolen and your account manipulated. If you need to use kubernetes (which has its own very problematic security issues) then assign AWS IAM roles with minimal permissions to the pods (for EKS it means using OIDC, cf https://docs.aws.amazon.com/eks/latest/userguide/service_IAM_role.html).Am 30.09.2023 um 03:41 schrieb Jon Rodríguez Aranguren <jon.r.arangu...@gmail.com>:Dear Spark Community Members,I trust this message finds you all in good health and spirits.I'm reaching out to the collective expertise of this esteemed community with a query regarding Spark on Kubernetes. As a newcomer, I have always admired the depth and breadth of knowledge shared within this forum, and it is my hope that some of you might have insights on a specific challenge I'm facing.I am currently trying to configure multiple Kubernetes secrets, notably multiple S3 keys, at the SparkConf level for a Spark application. My objective is to understand the best approach or methods to ensure that these secrets can be smoothly accessed by the Spark application.If any of you have previously encountered this scenario or possess relevant insights on the matter, your guidance would be highly beneficial.Thank you for your time and consideration. I'm eager to learn from the experiences and knowledge present within this community.Warm regards,Jon




Re: Seeking Guidance on Spark on Kubernetes Secrets Configuration

2023-10-01 Thread Mich Talebzadeh
It seems that workload identity
<https://cloud.google.com/iam/docs/workload-identity-federation> is not
available on AWS. Workload Identity replaces the need to use Metadata
concealment on exposed storage such as s3 and gcs. The sensitive metadata
protected by metadata concealment is also protected by Workload Identity.

Both Google Cloud Kubernetes (GKE
<https://cloud.google.com/kubernetes-engine/docs/how-to/workload-identity>)
and Azure Kubernetes Servi
<https://learn.microsoft.com/en-us/azure/aks/workload-identity-overview>ce
support Workload Identity. Taking notes from Google Cloud:  "Workload
Identity is the recommended way for your workloads running on Google
Kubernetes Engine (GKE) to access Google Cloud services in a secure and
manageable way."


HTH


Mich Talebzadeh,
Distinguished Technologist, Solutions Architect & Engineer
London
United Kingdom


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sun, 1 Oct 2023 at 06:36, Jayabindu Singh  wrote:

> Hi Jon,
>
> Using IAM as suggested by Jorn is the best approach.
> We recently moved our spark workload from HDP to Spark on K8 and utilizing
> IAM.
> It will save you from secret management headaches and also allows a lot
> more flexibility on access control and option to allow access to multiple
> S3 buckets in the same pod.
> We have implemented this across Azure, Google and AWS. Azure does require
> some extra work to make it work.
>
> On Sat, Sep 30, 2023 at 12:05 PM Jörn Franke  wrote:
>
>> Don’t use static iam (s3) credentials. It is an outdated insecure method
>> - even AWS recommend against using this for anything (cf eg
>> https://docs.aws.amazon.com/cli/latest/userguide/cli-authentication-user.html
>> ).
>> It is almost a guarantee to get your data stolen and your account
>> manipulated.
>>
>> If you need to use kubernetes (which has its own very problematic
>> security issues) then assign AWS IAM roles with minimal permissions to the
>> pods (for EKS it means using OIDC, cf
>> https://docs.aws.amazon.com/eks/latest/userguide/service_IAM_role.html).
>>
>> Am 30.09.2023 um 03:41 schrieb Jon Rodríguez Aranguren <
>> jon.r.arangu...@gmail.com>:
>>
>> 
>> Dear Spark Community Members,
>>
>> I trust this message finds you all in good health and spirits.
>>
>> I'm reaching out to the collective expertise of this esteemed community
>> with a query regarding Spark on Kubernetes. As a newcomer, I have always
>> admired the depth and breadth of knowledge shared within this forum, and it
>> is my hope that some of you might have insights on a specific challenge I'm
>> facing.
>>
>> I am currently trying to configure multiple Kubernetes secrets, notably
>> multiple S3 keys, at the SparkConf level for a Spark application. My
>> objective is to understand the best approach or methods to ensure that
>> these secrets can be smoothly accessed by the Spark application.
>>
>> If any of you have previously encountered this scenario or possess
>> relevant insights on the matter, your guidance would be highly beneficial.
>>
>> Thank you for your time and consideration. I'm eager to learn from the
>> experiences and knowledge present within this community.
>>
>> Warm regards,
>> Jon
>>
>>


Re: Seeking Guidance on Spark on Kubernetes Secrets Configuration

2023-09-30 Thread Jayabindu Singh
Hi Jon,

Using IAM as suggested by Jorn is the best approach.
We recently moved our spark workload from HDP to Spark on K8 and utilizing
IAM.
It will save you from secret management headaches and also allows a lot
more flexibility on access control and option to allow access to multiple
S3 buckets in the same pod.
We have implemented this across Azure, Google and AWS. Azure does require
some extra work to make it work.

On Sat, Sep 30, 2023 at 12:05 PM Jörn Franke  wrote:

> Don’t use static iam (s3) credentials. It is an outdated insecure method -
> even AWS recommend against using this for anything (cf eg
> https://docs.aws.amazon.com/cli/latest/userguide/cli-authentication-user.html
> ).
> It is almost a guarantee to get your data stolen and your account
> manipulated.
>
> If you need to use kubernetes (which has its own very problematic security
> issues) then assign AWS IAM roles with minimal permissions to the pods (for
> EKS it means using OIDC, cf
> https://docs.aws.amazon.com/eks/latest/userguide/service_IAM_role.html).
>
> Am 30.09.2023 um 03:41 schrieb Jon Rodríguez Aranguren <
> jon.r.arangu...@gmail.com>:
>
> 
> Dear Spark Community Members,
>
> I trust this message finds you all in good health and spirits.
>
> I'm reaching out to the collective expertise of this esteemed community
> with a query regarding Spark on Kubernetes. As a newcomer, I have always
> admired the depth and breadth of knowledge shared within this forum, and it
> is my hope that some of you might have insights on a specific challenge I'm
> facing.
>
> I am currently trying to configure multiple Kubernetes secrets, notably
> multiple S3 keys, at the SparkConf level for a Spark application. My
> objective is to understand the best approach or methods to ensure that
> these secrets can be smoothly accessed by the Spark application.
>
> If any of you have previously encountered this scenario or possess
> relevant insights on the matter, your guidance would be highly beneficial.
>
> Thank you for your time and consideration. I'm eager to learn from the
> experiences and knowledge present within this community.
>
> Warm regards,
> Jon
>
>


Re: Seeking Guidance on Spark on Kubernetes Secrets Configuration

2023-09-30 Thread Jörn Franke
Don’t use static iam (s3) credentials. It is an outdated insecure method - even 
AWS recommend against using this for anything (cf eg 
https://docs.aws.amazon.com/cli/latest/userguide/cli-authentication-user.html).
It is almost a guarantee to get your data stolen and your account manipulated. 

If you need to use kubernetes (which has its own very problematic security 
issues) then assign AWS IAM roles with minimal permissions to the pods (for EKS 
it means using OIDC, cf 
https://docs.aws.amazon.com/eks/latest/userguide/service_IAM_role.html).

> Am 30.09.2023 um 03:41 schrieb Jon Rodríguez Aranguren 
> :
> 
> 
> Dear Spark Community Members,
> 
> I trust this message finds you all in good health and spirits.
> 
> I'm reaching out to the collective expertise of this esteemed community with 
> a query regarding Spark on Kubernetes. As a newcomer, I have always admired 
> the depth and breadth of knowledge shared within this forum, and it is my 
> hope that some of you might have insights on a specific challenge I'm facing.
> 
> I am currently trying to configure multiple Kubernetes secrets, notably 
> multiple S3 keys, at the SparkConf level for a Spark application. My 
> objective is to understand the best approach or methods to ensure that these 
> secrets can be smoothly accessed by the Spark application.
> 
> If any of you have previously encountered this scenario or possess relevant 
> insights on the matter, your guidance would be highly beneficial.
> 
> Thank you for your time and consideration. I'm eager to learn from the 
> experiences and knowledge present within this community.
> 
> Warm regards,
> Jon


Seeking Guidance on Spark on Kubernetes Secrets Configuration

2023-09-29 Thread Jon Rodríguez Aranguren
Dear Spark Community Members,

I trust this message finds you all in good health and spirits.

I'm reaching out to the collective expertise of this esteemed community
with a query regarding Spark on Kubernetes. As a newcomer, I have always
admired the depth and breadth of knowledge shared within this forum, and it
is my hope that some of you might have insights on a specific challenge I'm
facing.

I am currently trying to configure multiple Kubernetes secrets, notably
multiple S3 keys, at the SparkConf level for a Spark application. My
objective is to understand the best approach or methods to ensure that
these secrets can be smoothly accessed by the Spark application.

If any of you have previously encountered this scenario or possess relevant
insights on the matter, your guidance would be highly beneficial.

Thank you for your time and consideration. I'm eager to learn from the
experiences and knowledge present within this community.

Warm regards,
Jon


Re: Urgent: Seeking Guidance on Kafka Slow Consumer and Data Skew Problem

2023-09-22 Thread Karthick
Hi All,

It will be helpful if we gave any pointers to the problem addressed.

Thanks
Karthick.

On Wed, Sep 20, 2023 at 3:03 PM Gowtham S  wrote:

> Hi Spark Community,
>
> Thank you for bringing up this issue. We've also encountered the same
> challenge and are actively working on finding a solution. It's reassuring
> to know that we're not alone in this.
>
> If you have any insights or suggestions regarding how to address this
> problem, please feel free to share them.
>
> Looking forward to hearing from others who might have encountered similar
> issues.
>
>
> Thanks and regards,
> Gowtham S
>
>
> On Tue, 19 Sept 2023 at 17:23, Karthick 
> wrote:
>
>> Subject: Seeking Guidance on Kafka Slow Consumer and Data Skew Problem
>>
>> Dear Spark Community,
>>
>> I recently reached out to the Apache Flink community for assistance with
>> a critical issue we are facing in our IoT platform, which relies on Apache
>> Kafka and real-time data processing. We received some valuable insights and
>> suggestions from the Apache Flink community, and now, we would like to seek
>> your expertise and guidance on the same problem.
>>
>> In our IoT ecosystem, we are dealing with data streams from thousands of
>> devices, each uniquely identified. To maintain data integrity and ordering,
>> we have configured a Kafka topic with ten partitions, ensuring that each
>> device's data is directed to its respective partition based on its unique
>> identifier. While this architectural choice has been effective in
>> maintaining data order, it has unveiled a significant challenge:
>>
>> *Slow Consumer and Data Skew Problem:* When a single device experiences
>> processing delays, it acts as a bottleneck within the Kafka partition,
>> leading to delays in processing data from other devices sharing the same
>> partition. This issue severely affects the efficiency and scalability of
>> our entire data processing pipeline.
>>
>> Here are some key details:
>>
>> - Number of Devices: 1000 (with potential growth)
>> - Target Message Rate: 1000 messages per second (with expected growth)
>> - Kafka Partitions: 10 (some partitions are overloaded)
>> - We are planning to migrate from Apache Storm to Apache Flink/Spark.
>>
>> We are actively seeking guidance on the following aspects:
>>
>> *1. Independent Device Data Processing*: We require a strategy that
>> guarantees one device's processing speed does not affect other devices in
>> the same Kafka partition. In other words, we need a solution that ensures
>> the independent processing of each device's data.
>>
>> *2. Custom Partitioning Strategy:* We are looking for a custom
>> partitioning strategy to distribute the load evenly across Kafka
>> partitions. Currently, we are using Murmur hashing with the device's unique
>> identifier, but we are open to exploring alternative partitioning
>> strategies.
>>
>> *3. Determining Kafka Partition Count:* We seek guidance on how to
>> determine the optimal number of Kafka partitions to handle the target
>> message rate efficiently.
>>
>> *4. Handling Data Skew:* Strategies or techniques for handling data skew
>> within Apache Flink.
>>
>> We believe that many in your community may have faced similar challenges
>> or possess valuable insights into addressing them. Your expertise and
>> experiences can greatly benefit our team and the broader community dealing
>> with real-time data processing.
>>
>> If you have any knowledge, solutions, or references to open-source
>> projects, libraries, or community-contributed solutions that align with our
>> requirements, we would be immensely grateful for your input.
>>
>> We appreciate your prompt attention to this matter and eagerly await your
>> responses and insights. Your support will be invaluable in helping us
>> overcome this critical challenge.
>>
>> Thank you for your time and consideration.
>>
>> Thanks & regards,
>> Karthick.
>>
>


Re: Urgent: Seeking Guidance on Kafka Slow Consumer and Data Skew Problem

2023-09-20 Thread Gowtham S
Hi Spark Community,

Thank you for bringing up this issue. We've also encountered the same
challenge and are actively working on finding a solution. It's reassuring
to know that we're not alone in this.

If you have any insights or suggestions regarding how to address this
problem, please feel free to share them.

Looking forward to hearing from others who might have encountered similar
issues.


Thanks and regards,
Gowtham S


On Tue, 19 Sept 2023 at 17:23, Karthick  wrote:

> Subject: Seeking Guidance on Kafka Slow Consumer and Data Skew Problem
>
> Dear Spark Community,
>
> I recently reached out to the Apache Flink community for assistance with a
> critical issue we are facing in our IoT platform, which relies on Apache
> Kafka and real-time data processing. We received some valuable insights and
> suggestions from the Apache Flink community, and now, we would like to seek
> your expertise and guidance on the same problem.
>
> In our IoT ecosystem, we are dealing with data streams from thousands of
> devices, each uniquely identified. To maintain data integrity and ordering,
> we have configured a Kafka topic with ten partitions, ensuring that each
> device's data is directed to its respective partition based on its unique
> identifier. While this architectural choice has been effective in
> maintaining data order, it has unveiled a significant challenge:
>
> *Slow Consumer and Data Skew Problem:* When a single device experiences
> processing delays, it acts as a bottleneck within the Kafka partition,
> leading to delays in processing data from other devices sharing the same
> partition. This issue severely affects the efficiency and scalability of
> our entire data processing pipeline.
>
> Here are some key details:
>
> - Number of Devices: 1000 (with potential growth)
> - Target Message Rate: 1000 messages per second (with expected growth)
> - Kafka Partitions: 10 (some partitions are overloaded)
> - We are planning to migrate from Apache Storm to Apache Flink/Spark.
>
> We are actively seeking guidance on the following aspects:
>
> *1. Independent Device Data Processing*: We require a strategy that
> guarantees one device's processing speed does not affect other devices in
> the same Kafka partition. In other words, we need a solution that ensures
> the independent processing of each device's data.
>
> *2. Custom Partitioning Strategy:* We are looking for a custom
> partitioning strategy to distribute the load evenly across Kafka
> partitions. Currently, we are using Murmur hashing with the device's unique
> identifier, but we are open to exploring alternative partitioning
> strategies.
>
> *3. Determining Kafka Partition Count:* We seek guidance on how to
> determine the optimal number of Kafka partitions to handle the target
> message rate efficiently.
>
> *4. Handling Data Skew:* Strategies or techniques for handling data skew
> within Apache Flink.
>
> We believe that many in your community may have faced similar challenges
> or possess valuable insights into addressing them. Your expertise and
> experiences can greatly benefit our team and the broader community dealing
> with real-time data processing.
>
> If you have any knowledge, solutions, or references to open-source
> projects, libraries, or community-contributed solutions that align with our
> requirements, we would be immensely grateful for your input.
>
> We appreciate your prompt attention to this matter and eagerly await your
> responses and insights. Your support will be invaluable in helping us
> overcome this critical challenge.
>
> Thank you for your time and consideration.
>
> Thanks & regards,
> Karthick.
>


Urgent: Seeking Guidance on Kafka Slow Consumer and Data Skew Problem

2023-09-19 Thread Karthick
Subject: Seeking Guidance on Kafka Slow Consumer and Data Skew Problem

Dear Spark Community,

I recently reached out to the Apache Flink community for assistance with a
critical issue we are facing in our IoT platform, which relies on Apache
Kafka and real-time data processing. We received some valuable insights and
suggestions from the Apache Flink community, and now, we would like to seek
your expertise and guidance on the same problem.

In our IoT ecosystem, we are dealing with data streams from thousands of
devices, each uniquely identified. To maintain data integrity and ordering,
we have configured a Kafka topic with ten partitions, ensuring that each
device's data is directed to its respective partition based on its unique
identifier. While this architectural choice has been effective in
maintaining data order, it has unveiled a significant challenge:

*Slow Consumer and Data Skew Problem:* When a single device experiences
processing delays, it acts as a bottleneck within the Kafka partition,
leading to delays in processing data from other devices sharing the same
partition. This issue severely affects the efficiency and scalability of
our entire data processing pipeline.

Here are some key details:

- Number of Devices: 1000 (with potential growth)
- Target Message Rate: 1000 messages per second (with expected growth)
- Kafka Partitions: 10 (some partitions are overloaded)
- We are planning to migrate from Apache Storm to Apache Flink/Spark.

We are actively seeking guidance on the following aspects:

*1. Independent Device Data Processing*: We require a strategy that
guarantees one device's processing speed does not affect other devices in
the same Kafka partition. In other words, we need a solution that ensures
the independent processing of each device's data.

*2. Custom Partitioning Strategy:* We are looking for a custom partitioning
strategy to distribute the load evenly across Kafka partitions. Currently,
we are using Murmur hashing with the device's unique identifier, but we are
open to exploring alternative partitioning strategies.

*3. Determining Kafka Partition Count:* We seek guidance on how to
determine the optimal number of Kafka partitions to handle the target
message rate efficiently.

*4. Handling Data Skew:* Strategies or techniques for handling data skew
within Apache Flink.

We believe that many in your community may have faced similar challenges or
possess valuable insights into addressing them. Your expertise and
experiences can greatly benefit our team and the broader community dealing
with real-time data processing.

If you have any knowledge, solutions, or references to open-source
projects, libraries, or community-contributed solutions that align with our
requirements, we would be immensely grateful for your input.

We appreciate your prompt attention to this matter and eagerly await your
responses and insights. Your support will be invaluable in helping us
overcome this critical challenge.

Thank you for your time and consideration.

Thanks & regards,
Karthick.


Fwd:  Wednesday: Join 6 Members at "Ofir Press | Complementing Scale: Novel Guidance Methods for Improving LMs"

2023-08-24 Thread Mich Talebzadeh
They recently combined Apache Spark and  AI meeting in London.

An online session worth attending for some?

HTH

Mich

   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>


 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




-- Forwarded message -
From: Apache Spark+AI London 
Date: Thu, 24 Aug 2023 at 20:01
Subject:  Wednesday: Join 6 Members at "Ofir Press | Complementing Scale:
Novel Guidance Methods for Improving LMs"
To: 


Apache Spark+AI London invites you to keep connecting

[image: Meetup]
<http://clicks.meetup.com/ls/click?upn=XbaZ37larFA-2FuV5MohrYpdrra25MtI4CzodbRR1Rd1lMbY-2F0BB6sl1-2BGb-2BU4Xb61IYOfzSGDr1WLA6MC9Dyq9-2F8g3ZHmsfIhK4vjh2BPLpwsD6Iy-2BmFCfGEumnmGhsB4xvvr0JsqTn3JcdmaNJdQmw-3D-3DkYao_Y1imAGhe9PFOX0yIXfkkZuRXJYXWpMyeZZtnEsE9vxhYJX7At9WbASZx-2B7Yno7jp7lnd8WbITbo-2BTZqywe2J-2FOVAP8gA3GEo9TaOqm8R1FsqAUHxZPKlCWGK7DKbejoOip5UXg2HjAB-2BKZFitkCvgCbNsCCMG9ZelHebLF-2B8BFSNxbtmKCoDQx2WU3hXML1SeBBPLDxyH-2FpSy7BaPGh3leetGFi6STd-2FJMTu0pi3sCSizn2JmYBppBaIS6-2BxjzZCEGMhDuHoiFE3Fyh0GjgJvLUsI5Iht2-2BpH8-2F9uGpot6nBeTitT89H2-2FhresQGyt7-2F-2FgBJKaSA5K-2F-2Ffwtz5KJR2mKDzJKyEkj4VvdsKiX2Qimotni6SCIDcPK5m2qsM13-2BUZg1LGS9HrA8FWi7QlW6DM3XqxUDY5CmOmZJ6p2d2wcc5eWpbgK-2FVIyI1RZd2DiNLnecv8n6qrFPhIj0-2BRw0YocCmvTLh-2F1NQ0CrUneizlKxHazkUQVUXLYQmKSkUvH0iU4h7btTvVG4geaHk3OEidSoQDPr1FqX-2BNsjF-2FYGaRDKZ69BaMcUATndkC01-2BCfW2fvgbLcI72o-2F1g3fD6fb-2BBXxrd5sJe8-2FIfvvRjGV9t-2B2AMnK95w3fYUFD0qWurS4VXKI-2FuW0Odlu-2BIIuv-2FGZszgnBMX0yvJbWDLhXbbNfwKdROIWzAvBZ6-2FLsq98UNs9zJicpYRqvtlLJVDWSPdv1QPab4KHYXjzTIqAMSYsn2WuigIzuZl1sysSy01kkItc>

<http://clicks.meetup.com/ls/click?upn=XbaZ37larFA-2FuV5MohrYpdrra25MtI4CzodbRR1Rd1lucdvIwHwxpqyRpUJ2cLgrLceiLidYHikYKFDLUotqnKr1wFOtdTEoK-2Fs3Zb-2F-2BDHS-2B0ciqgGMn9AB3y5MCYgQP7rJpQ2SSPlU3-2FyOWUDwFyg-3D-3DFQUx_Y1imAGhe9PFOX0yIXfkkZuRXJYXWpMyeZZtnEsE9vxhYJX7At9WbASZx-2B7Yno7jp7lnd8WbITbo-2BTZqywe2J-2FOVAP8gA3GEo9TaOqm8R1FsqAUHxZPKlCWGK7DKbejoOip5UXg2HjAB-2BKZFitkCvgCbNsCCMG9ZelHebLF-2B8BFSNxbtmKCoDQx2WU3hXML1SeBBPLDxyH-2FpSy7BaPGh3leetGFi6STd-2FJMTu0pi3sCSizn2JmYBppBaIS6-2BxjzZCEGMhDuHoiFE3Fyh0GjgJvLUsI5Iht2-2BpH8-2F9uGpot6nBeTitT89H2-2FhresQGyt7-2F-2FgBJKaSA5K-2F-2Ffwtz5KJR2mKDzJKyEkj4VvdsKiX2Qimotni6SCIDcPK5m2qsM13-2BUZg1LGS9HrA8FWi7QlW6DM3XqxUDY5CmOmZJ6p2d2wcc5eWpbgK-2FVIyI1RZd2DiNLnecv8n6qrFPhIj0-2BRw0YocCmvTLh-2F1NQ0CrUneizlKxHazkUQVUXLYQmKSkUvH0iU4h7btTvVG4geaHk3OEidSoQDPr1FqX-2BNsjF-2FYGaRDKZ69BaMcUATndkC01-2BCfW2fvgbLcI72o-2F1g3fD6fb-2BBXxrd5sJe8-2FIfvvRjGV9t9i-2FiXfprAFg-2FW4xtYRoY76DxLmHyNuKXC822GetT3wPkc2Oprz147XLurkQtD7l5YS4cYTvbPPvmE8gUUZTX-2BYoD4yLCdnnJ2KNOs2VUkxwNEVURPEjh8L-2BAYGG48siWFtxanIArixv9zGFMHY8fic>
You
are 1 RSVP away from 5 RSVPs
<http://clicks.meetup.com/ls/click?upn=XbaZ37larFA-2FuV5MohrYpdrra25MtI4CzodbRR1Rd1lucdvIwHwxpqyRpUJ2cLgrLceiLidYHikYKFDLUotqnKr1wFOtdTEoK-2Fs3Zb-2F-2BDHS-2B0ciqgGMn9AB3y5MCYgQP7rJpQ2SSPlU3-2FyOWUDwFyg-3D-3D8PAt_Y1imAGhe9PFOX0yIXfkkZuRXJYXWpMyeZZtnEsE9vxhYJX7At9WbASZx-2B7Yno7jp7lnd8WbITbo-2BTZqywe2J-2FOVAP8gA3GEo9TaOqm8R1FsqAUHxZPKlCWGK7DKbejoOip5UXg2HjAB-2BKZFitkCvgCbNsCCMG9ZelHebLF-2B8BFSNxbtmKCoDQx2WU3hXML1SeBBPLDxyH-2FpSy7BaPGh3leetGFi6STd-2FJMTu0pi3sCSizn2JmYBppBaIS6-2BxjzZCEGMhDuHoiFE3Fyh0GjgJvLUsI5Iht2-2BpH8-2F9uGpot6nBeTitT89H2-2FhresQGyt7-2F-2FgBJKaSA5K-2F-2Ffwtz5KJR2mKDzJKyEkj4VvdsKiX2Qimotni6SCIDcPK5m2qsM13-2BUZg1LGS9HrA8FWi7QlW6DM3XqxUDY5CmOmZJ6p2d2wcc5eWpbgK-2FVIyI1RZd2DiNLnecv8n6qrFPhIj0-2BRw0YocCmvTLh-2F1NQ0CrUneizlKxHazkUQVUXLYQmKSkUvH0iU4h7btTvVG4geaHk3OEidSoQDPr1FqX-2BNsjF-2FYGaRDKZ69BaMcUATndkC01-2BCfW2fvgbLcI72o-2F1g3fD6fb-2BBXxrd5sJe8-2FIfvvRjGV9t-2FLJ6R5UpfuK5-2BN8B0Uiv9wFHzmFleJajLBFbkk-2F8cuurWARcGAEkYss7S9t16MqEQZh6hECYoe1OYYOnKQrg2w9Daqo2E8nE8pm5NAV07nCBQpGycU2ynA6pgaX8WASznbxwksHUirzhaKu9ndg24g>
Wednesday
Ofir Press | Complementing Scale: Novel Guidance Methods for Improving LMs
<http://clicks.meetup.com/ls/click?upn=XbaZ37larFA-2FuV5MohrYpdrra25MtI4CzodbRR1Rd1miCKx60eDFevCrdo8wt3KCFjcAnYhNkBulr7Qec2c-2Fmg-3D-3DNa8f_Y1imAGhe9PFOX0yIXfkkZuRXJYXWpMyeZZtnEsE9vxhYJX7At9WbASZx-2B7Yno7jp7lnd8WbITbo-2BTZqywe2J-2FOVAP8gA3GEo9TaOqm8R1FsqAUHxZPKlCWGK7DKbejoOip5UXg2HjAB-2BKZFitkCvgCbNsCCMG9ZelHebLF-2B8BFSNxbtmKCoDQx2WU3hXML1SeBBPLDxyH-2FpSy7BaPGh3leetGFi6STd-2FJMTu0pi3sCSizn2JmYBppBaIS6-2BxjzZCEGMhDuHoiFE3Fyh0GjgJvLUsI5Iht2-2BpH8-2F9uGpot6nBeTitT89H2-2FhresQGyt7-2F-2FgBJKaSA5K-2F-2Ffwtz5KJR2mKDzJKyEkj4VvdsKiX2Qimotni6SCIDcPK5m2qsM13-2BUZg1LGS9HrA8FWi7QlW6DM3XqxUDY5CmOmZJ6p2d2wcc5eWpbgK-2FVIyI1RZd2DiNLnecv8n6qrFPhIj0-2BRw0YocCmvTLh-2F1NQ0CrUneizlKxHazkUQVUXLYQmKSkUvH0iU4h7btTvVG4geaHk3OEidSoQDPr1FqX-2BNsjF-2FYGaRDKZ69BaMcUATndkC01-2BCfW2fvgbLcI72o-2

Guidance

2020-07-27 Thread Suat Toksöz
Hi everyone, I want to ask for guidance for my log analyzer platform idea.
I have an elasticsearch system which collects the logs from different
platforms, and creates alerts. The system writes the alerts to an index on
ES. Also, my alerts are stored in a folder as JSON (multi line format).

The Goals:

   1. Read json folder or ES index as streaming (read in new entry within 5
   min)
   2. Select only alerts that I want to work on ( alert.id = 100 ,
   status=true , ...)
   3. Create a DataFrame + Window for 10 min period
   4. Run a query fro that DataFrame by grupping by IP ( If same IP gets 3
   alerts then show me the result)
   5. All the coding should be in python


The ideas is something like this, my question is how should I proceed to
this task. What are the technologies that I should use?

*Apache Spark + Python + Pyspark + Kaola *can handle this ?

-- 

Best regards,

*Suat Toksoz*


Re: Need some guidance

2015-04-14 Thread Victor Tso-Guillen
Thanks, yes. I was using Int for my V and didn't get the second param in
the second closure right :)

On Mon, Apr 13, 2015 at 1:55 PM, Dean Wampler deanwamp...@gmail.com wrote:

 That appears to work, with a few changes to get the types correct:

 input.distinct().combineByKey((s: String) = 1, (agg: Int, s: String) =
 agg + 1, (agg1: Int, agg2: Int) = agg1 + agg2)

 dean

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Mon, Apr 13, 2015 at 3:24 PM, Victor Tso-Guillen v...@paxata.com
 wrote:

 How about this?

 input.distinct().combineByKey((v: V) = 1, (agg: Int, x: Int) = agg + 1,
 (agg1: Int, agg2: Int) = agg1 + agg2).collect()

 On Mon, Apr 13, 2015 at 10:31 AM, Dean Wampler deanwamp...@gmail.com
 wrote:

 The problem with using collect is that it will fail for large data sets,
 as you'll attempt to copy the entire RDD to the memory of your driver
 program. The following works (Scala syntax, but similar to Python):

 scala val i1 = input.distinct.groupByKey
 scala i1.foreach(println)
 (1,CompactBuffer(beta, alpha, foo))
 (3,CompactBuffer(foo))
 (2,CompactBuffer(alpha, bar))

 scala val i2 = i1.map(tup = (tup._1, tup._2.size))
 scala i1.foreach(println)
 (1,3)
 (3,1)
 (2,2)

 The i2 line passes a function that takes a tuple argument, then
 constructs a new output tuple with the first element and the size of the
 second (each CompactBuffer). An alternative pattern match syntax would be.

 scala val i2 = i1.map { case (key, buffer) = (key, buffer.size) }

 This should work as long as none of the CompactBuffers are too large,
 which could happen for extremely large data sets.

 dean

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Mon, Apr 13, 2015 at 11:45 AM, Marco Shaw marco.s...@gmail.com
 wrote:

 **Learning the ropes**

 I'm trying to grasp the concept of using the pipeline in pySpark...

 Simplified example:
 
 list=[(1,alpha),(1,beta),(1,foo),(1,alpha),(2,alpha),(2,alpha),(2,bar),(3,foo)]

 Desired outcome:
 [(1,3),(2,2),(3,1)]

 Basically for each key, I want the number of unique values.

 I've tried different approaches, but am I really using Spark
 effectively?  I wondered if I would do something like:
  input=sc.parallelize(list)
  input.groupByKey().collect()

 Then I wondered if I could do something like a foreach over each key
 value, and then map the actual values and reduce them.  Pseudo-code:

 input.groupbykey()
 .keys
 .foreach(_.values
 .map(lambda x: x,1)
 .reducebykey(lambda a,b:a+b)
 .count()
 )

 I was somehow hoping that the key would get the current value of count,
 and thus be the count of the unique keys, which is exactly what I think I'm
 looking for.

 Am I way off base on how I could accomplish this?

 Marco







Re: Need some guidance

2015-04-13 Thread Dean Wampler
That appears to work, with a few changes to get the types correct:

input.distinct().combineByKey((s: String) = 1, (agg: Int, s: String) =
agg + 1, (agg1: Int, agg2: Int) = agg1 + agg2)

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Mon, Apr 13, 2015 at 3:24 PM, Victor Tso-Guillen v...@paxata.com wrote:

 How about this?

 input.distinct().combineByKey((v: V) = 1, (agg: Int, x: Int) = agg + 1,
 (agg1: Int, agg2: Int) = agg1 + agg2).collect()

 On Mon, Apr 13, 2015 at 10:31 AM, Dean Wampler deanwamp...@gmail.com
 wrote:

 The problem with using collect is that it will fail for large data sets,
 as you'll attempt to copy the entire RDD to the memory of your driver
 program. The following works (Scala syntax, but similar to Python):

 scala val i1 = input.distinct.groupByKey
 scala i1.foreach(println)
 (1,CompactBuffer(beta, alpha, foo))
 (3,CompactBuffer(foo))
 (2,CompactBuffer(alpha, bar))

 scala val i2 = i1.map(tup = (tup._1, tup._2.size))
 scala i1.foreach(println)
 (1,3)
 (3,1)
 (2,2)

 The i2 line passes a function that takes a tuple argument, then
 constructs a new output tuple with the first element and the size of the
 second (each CompactBuffer). An alternative pattern match syntax would be.

 scala val i2 = i1.map { case (key, buffer) = (key, buffer.size) }

 This should work as long as none of the CompactBuffers are too large,
 which could happen for extremely large data sets.

 dean

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Mon, Apr 13, 2015 at 11:45 AM, Marco Shaw marco.s...@gmail.com
 wrote:

 **Learning the ropes**

 I'm trying to grasp the concept of using the pipeline in pySpark...

 Simplified example:
 
 list=[(1,alpha),(1,beta),(1,foo),(1,alpha),(2,alpha),(2,alpha),(2,bar),(3,foo)]

 Desired outcome:
 [(1,3),(2,2),(3,1)]

 Basically for each key, I want the number of unique values.

 I've tried different approaches, but am I really using Spark
 effectively?  I wondered if I would do something like:
  input=sc.parallelize(list)
  input.groupByKey().collect()

 Then I wondered if I could do something like a foreach over each key
 value, and then map the actual values and reduce them.  Pseudo-code:

 input.groupbykey()
 .keys
 .foreach(_.values
 .map(lambda x: x,1)
 .reducebykey(lambda a,b:a+b)
 .count()
 )

 I was somehow hoping that the key would get the current value of count,
 and thus be the count of the unique keys, which is exactly what I think I'm
 looking for.

 Am I way off base on how I could accomplish this?

 Marco






Re: Need some guidance

2015-04-13 Thread Victor Tso-Guillen
How about this?

input.distinct().combineByKey((v: V) = 1, (agg: Int, x: Int) = agg + 1,
(agg1: Int, agg2: Int) = agg1 + agg2).collect()

On Mon, Apr 13, 2015 at 10:31 AM, Dean Wampler deanwamp...@gmail.com
wrote:

 The problem with using collect is that it will fail for large data sets,
 as you'll attempt to copy the entire RDD to the memory of your driver
 program. The following works (Scala syntax, but similar to Python):

 scala val i1 = input.distinct.groupByKey
 scala i1.foreach(println)
 (1,CompactBuffer(beta, alpha, foo))
 (3,CompactBuffer(foo))
 (2,CompactBuffer(alpha, bar))

 scala val i2 = i1.map(tup = (tup._1, tup._2.size))
 scala i1.foreach(println)
 (1,3)
 (3,1)
 (2,2)

 The i2 line passes a function that takes a tuple argument, then
 constructs a new output tuple with the first element and the size of the
 second (each CompactBuffer). An alternative pattern match syntax would be.

 scala val i2 = i1.map { case (key, buffer) = (key, buffer.size) }

 This should work as long as none of the CompactBuffers are too large,
 which could happen for extremely large data sets.

 dean

 Dean Wampler, Ph.D.
 Author: Programming Scala, 2nd Edition
 http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
 Typesafe http://typesafe.com
 @deanwampler http://twitter.com/deanwampler
 http://polyglotprogramming.com

 On Mon, Apr 13, 2015 at 11:45 AM, Marco Shaw marco.s...@gmail.com wrote:

 **Learning the ropes**

 I'm trying to grasp the concept of using the pipeline in pySpark...

 Simplified example:
 
 list=[(1,alpha),(1,beta),(1,foo),(1,alpha),(2,alpha),(2,alpha),(2,bar),(3,foo)]

 Desired outcome:
 [(1,3),(2,2),(3,1)]

 Basically for each key, I want the number of unique values.

 I've tried different approaches, but am I really using Spark
 effectively?  I wondered if I would do something like:
  input=sc.parallelize(list)
  input.groupByKey().collect()

 Then I wondered if I could do something like a foreach over each key
 value, and then map the actual values and reduce them.  Pseudo-code:

 input.groupbykey()
 .keys
 .foreach(_.values
 .map(lambda x: x,1)
 .reducebykey(lambda a,b:a+b)
 .count()
 )

 I was somehow hoping that the key would get the current value of count,
 and thus be the count of the unique keys, which is exactly what I think I'm
 looking for.

 Am I way off base on how I could accomplish this?

 Marco





Re: Need some guidance

2015-04-13 Thread Dean Wampler
The problem with using collect is that it will fail for large data sets, as
you'll attempt to copy the entire RDD to the memory of your driver program.
The following works (Scala syntax, but similar to Python):

scala val i1 = input.distinct.groupByKey
scala i1.foreach(println)
(1,CompactBuffer(beta, alpha, foo))
(3,CompactBuffer(foo))
(2,CompactBuffer(alpha, bar))

scala val i2 = i1.map(tup = (tup._1, tup._2.size))
scala i1.foreach(println)
(1,3)
(3,1)
(2,2)

The i2 line passes a function that takes a tuple argument, then
constructs a new output tuple with the first element and the size of the
second (each CompactBuffer). An alternative pattern match syntax would be.

scala val i2 = i1.map { case (key, buffer) = (key, buffer.size) }

This should work as long as none of the CompactBuffers are too large, which
could happen for extremely large data sets.

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
http://shop.oreilly.com/product/0636920033073.do (O'Reilly)
Typesafe http://typesafe.com
@deanwampler http://twitter.com/deanwampler
http://polyglotprogramming.com

On Mon, Apr 13, 2015 at 11:45 AM, Marco Shaw marco.s...@gmail.com wrote:

 **Learning the ropes**

 I'm trying to grasp the concept of using the pipeline in pySpark...

 Simplified example:
 
 list=[(1,alpha),(1,beta),(1,foo),(1,alpha),(2,alpha),(2,alpha),(2,bar),(3,foo)]

 Desired outcome:
 [(1,3),(2,2),(3,1)]

 Basically for each key, I want the number of unique values.

 I've tried different approaches, but am I really using Spark effectively?
 I wondered if I would do something like:
  input=sc.parallelize(list)
  input.groupByKey().collect()

 Then I wondered if I could do something like a foreach over each key
 value, and then map the actual values and reduce them.  Pseudo-code:

 input.groupbykey()
 .keys
 .foreach(_.values
 .map(lambda x: x,1)
 .reducebykey(lambda a,b:a+b)
 .count()
 )

 I was somehow hoping that the key would get the current value of count,
 and thus be the count of the unique keys, which is exactly what I think I'm
 looking for.

 Am I way off base on how I could accomplish this?

 Marco



Need some guidance

2015-04-13 Thread Marco Shaw
**Learning the ropes**

I'm trying to grasp the concept of using the pipeline in pySpark...

Simplified example:

list=[(1,alpha),(1,beta),(1,foo),(1,alpha),(2,alpha),(2,alpha),(2,bar),(3,foo)]

Desired outcome:
[(1,3),(2,2),(3,1)]

Basically for each key, I want the number of unique values.

I've tried different approaches, but am I really using Spark effectively?
I wondered if I would do something like:
 input=sc.parallelize(list)
 input.groupByKey().collect()

Then I wondered if I could do something like a foreach over each key value,
and then map the actual values and reduce them.  Pseudo-code:

input.groupbykey()
.keys
.foreach(_.values
.map(lambda x: x,1)
.reducebykey(lambda a,b:a+b)
.count()
)

I was somehow hoping that the key would get the current value of count, and
thus be the count of the unique keys, which is exactly what I think I'm
looking for.

Am I way off base on how I could accomplish this?

Marco


Re: guidance on simple unit testing with Spark

2014-06-16 Thread Daniel Siegmann
If you don't want to refactor your code, you can put your input into a test
file. After the test runs, read the data from the output file you specified
(probably want this to be a temp file and delete on exit). Of course, that
is not really a unit test - Metei's suggestion is preferable (this is how
we test). However, if you have a long and complex flow, you might unit test
different parts, and then have an integration test which reads from the
files and tests the whole flow together (I do this as well).




On Fri, Jun 13, 2014 at 10:04 PM, Matei Zaharia matei.zaha...@gmail.com
wrote:

 You need to factor your program so that it’s not just a main(). This is
 not a Spark-specific issue, it’s about how you’d unit test any program in
 general. In this case, your main() creates a SparkContext, so you can’t
 pass one from outside, and your code has to read data from a file and write
 it to a file. It would be better to move your code for transforming data
 into a new function:

 def processData(lines: RDD[String]): RDD[String] = {
   // build and return your “res” variable
 }

 Then you can unit-test this directly on data you create in your program:

 val myLines = sc.parallelize(Seq(“line 1”, “line 2”))
 val result = GetInfo.processData(myLines).collect()
 assert(result.toSet === Set(“res 1”, “res 2”))

 Matei

 On Jun 13, 2014, at 2:42 PM, SK skrishna...@gmail.com wrote:

  Hi,
 
  I have looked through some of the  test examples and also the brief
  documentation on unit testing at
  http://spark.apache.org/docs/latest/programming-guide.html#unit-testing,
 but
  still dont have a good understanding of writing unit tests using the
 Spark
  framework. Previously, I have written unit tests using specs2 framework
 and
  have got them to work in Scalding.  I tried to use the specs2 framework
 with
  Spark, but could not find any simple examples I could follow. I am open
 to
  specs2 or Funsuite, whichever works best with Spark. I would like some
  additional guidance, or some simple sample code using specs2 or
 Funsuite. My
  code is provided below.
 
 
  I have the following code in src/main/scala/GetInfo.scala. It reads a
 Json
  file and extracts some data. It takes the input file (args(0)) and output
  file (args(1)) as arguments.
 
  object GetInfo{
 
def main(args: Array[String]) {
  val inp_file = args(0)
  val conf = new SparkConf().setAppName(GetInfo)
  val sc = new SparkContext(conf)
  val res = sc.textFile(log_file)
.map(line = { parse(line) })
.map(json =
   {
  implicit lazy val formats =
  org.json4s.DefaultFormats
  val aid = (json \ d \ TypeID).extract[Int]
  val ts = (json \ d \ TimeStamp).extract[Long]
  val gid = (json \ d \ ID).extract[String]
  (aid, ts, gid)
   }
 )
.groupBy(tup = tup._3)
.sortByKey(true)
.map(g = (g._1, g._2.map(_._2).max))
  res.map(tuple= %s, %d.format(tuple._1,
  tuple._2)).saveAsTextFile(args(1))
  }
 
 
  I would like to test the above code. My unit test is in src/test/scala.
 The
  code I have so far for the unit test appears below:
 
  import org.apache.spark._
  import org.specs2.mutable._
 
  class GetInfoTest extends Specification with java.io.Serializable{
 
  val data = List (
   (d: {TypeID = 10, Timestamp: 1234, ID: ID1}),
   (d: {TypeID = 11, Timestamp: 5678, ID: ID1}),
   (d: {TypeID = 10, Timestamp: 1357, ID: ID2}),
   (d: {TypeID = 11, Timestamp: 2468, ID: ID2})
 )
 
  val expected_out = List(
 (ID1,5678),
 (ID2,2468),
  )
 
 A GetInfo job should {
  //* How do I pass data define above as input and output
  which GetInfo expects as arguments? **
  val sc = new SparkContext(local, GetInfo)
 
  //*** how do I get the output ***
 
   //assuming out_buffer has the output I want to match it to
 the
  expected output
  match expected output in {
   ( out_buffer == expected_out) must beTrue
  }
  }
 
  }
 
  I would like some help with the tasks marked with  in the unit test
  code above. If specs2 is not the right way to go, I am also open to
  FunSuite. I would like to know how to pass the input while calling my
  program from the unit test and get the output.
 
  Thanks for your help.
 
 
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/guidance-on-simple-unit-testing-with-Spark-tp7604.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.




-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: guidance on simple unit testing with Sprk

2014-06-14 Thread Gerard Maas
Ll mlll
On Jun 14, 2014 4:05 AM, Matei Zaharia matei.zaha...@gmail.com wrote:

 You need to factor your program so that it’s not just a main(). This is
 not a Spark-specific issue, it’s about how you’d unit test any program in
 general. In this case, your main() creates a SparkContext, so you can’t
 pass one from outside, and your code has to read data from a file and write
 it to a file. It would be better to move your code for transforming data
 into a new function:

 def processData(lines: RDD[String]): RDD[String] = {
   // build and return your “res” variable
 }

 Then you can unit-test this directly on data you create in your program:

 val myLines = sc.parallelize(Seq(“line 1”, “line 2”))
 val result = GetInfo.processData(myLines).collect()
 assert(result.toSet === Set(“res 1”, “res 2”))

 Matei

 On Jun 13, 2014, at 2:42 PM, SK skrishna...@gmail.com wrote:

  Hi,
 
  I have looked through some of the  test examples and also the brief
  documentation on unit testing at
  http://spark.apache.org/docs/latest/programming-guide.html#unit-testing,
 but
  still dont have a good understanding of writing unit tests using the
 Spark
  framework. Previously, I have written unit tests using specs2 framework
 and
  have got them to work in Scalding.  I tried to use the specs2 framework
 with
  Spark, but could not find any simple examples I could follow. I am open
 to
  specs2 or Funsuite, whichever works best with Spark. I would like some
  additional guidance, or some simple sample code using specs2 or
 Funsuite. My
  code is provided below.
 
 
  I have the following code in src/main/scala/GetInfo.scala. It reads a
 Json
  file and extracts some data. It takes the input file (args(0)) and output
  file (args(1)) as arguments.
 
  object GetInfo{
 
def main(args: Array[String]) {
  val inp_file = args(0)
  val conf = new SparkConf().setAppName(GetInfo)
  val sc = new SparkContext(conf)
  val res = sc.textFile(log_file)
.map(line = { parse(line) })
.map(json =
   {
  implicit lazy val formats =
  org.json4s.DefaultFormats
  val aid = (json \ d \ TypeID).extract[Int]
  val ts = (json \ d \ TimeStamp).extract[Long]
  val gid = (json \ d \ ID).extract[String]
  (aid, ts, gid)
   }
 )
.groupBy(tup = tup._3)
.sortByKey(true)
.map(g = (g._1, g._2.map(_._2).max))
  res.map(tuple= %s, %d.format(tuple._1,
  tuple._2)).saveAsTextFile(args(1))
  }
 
 
  I would like to test the above code. My unit test is in src/test/scala.
 The
  code I have so far for the unit test appears below:
 
  import org.apache.spark._
  import org.specs2.mutable._
 
  class GetInfoTest extends Specification with java.io.Serializable{
 
  val data = List (
   (d: {TypeID = 10, Timestamp: 1234, ID: ID1}),
   (d: {TypeID = 11, Timestamp: 5678, ID: ID1}),
   (d: {TypeID = 10, Timestamp: 1357, ID: ID2}),
   (d: {TypeID = 11, Timestamp: 2468, ID: ID2})
 )
 
  val expected_out = List(
 (ID1,5678),
 (ID2,2468),
  )
 
 A GetInfo job should {
  //* How do I pass data define above as input and output
  which GetInfo expects as arguments? **
  val sc = new SparkContext(local, GetInfo)
 
  //*** how do I get the output ***
 
   //assuming out_buffer has the output I want to match it to
 the
  expected output
  match expected output in {
   ( out_buffer == expected_out) must beTrue
  }
  }
 
  }
 
  I would like some help with the tasks marked with  in the unit test
  code above. If specs2 is not the right way to go, I am also open to
  FunSuite. I would like to know how to pass the input while calling my
  program from the unit test and get the output.
 
  Thanks for your help.
 
 
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/guidance-on-simple-unit-testing-with-Spark-tp7604.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.




Re: guidance on simple unit testing with Spark

2014-06-13 Thread Matei Zaharia
You need to factor your program so that it’s not just a main(). This is not a 
Spark-specific issue, it’s about how you’d unit test any program in general. In 
this case, your main() creates a SparkContext, so you can’t pass one from 
outside, and your code has to read data from a file and write it to a file. It 
would be better to move your code for transforming data into a new function:

def processData(lines: RDD[String]): RDD[String] = {
  // build and return your “res” variable
}

Then you can unit-test this directly on data you create in your program:

val myLines = sc.parallelize(Seq(“line 1”, “line 2”))
val result = GetInfo.processData(myLines).collect()
assert(result.toSet === Set(“res 1”, “res 2”))

Matei

On Jun 13, 2014, at 2:42 PM, SK skrishna...@gmail.com wrote:

 Hi,
 
 I have looked through some of the  test examples and also the brief
 documentation on unit testing at
 http://spark.apache.org/docs/latest/programming-guide.html#unit-testing, but
 still dont have a good understanding of writing unit tests using the Spark
 framework. Previously, I have written unit tests using specs2 framework and
 have got them to work in Scalding.  I tried to use the specs2 framework with
 Spark, but could not find any simple examples I could follow. I am open to
 specs2 or Funsuite, whichever works best with Spark. I would like some
 additional guidance, or some simple sample code using specs2 or Funsuite. My
 code is provided below.
 
 
 I have the following code in src/main/scala/GetInfo.scala. It reads a Json
 file and extracts some data. It takes the input file (args(0)) and output
 file (args(1)) as arguments.
 
 object GetInfo{
 
   def main(args: Array[String]) {
 val inp_file = args(0)
 val conf = new SparkConf().setAppName(GetInfo)
 val sc = new SparkContext(conf)
 val res = sc.textFile(log_file)
   .map(line = { parse(line) })
   .map(json =
  {
 implicit lazy val formats =
 org.json4s.DefaultFormats
 val aid = (json \ d \ TypeID).extract[Int]
 val ts = (json \ d \ TimeStamp).extract[Long]
 val gid = (json \ d \ ID).extract[String]
 (aid, ts, gid)
  }
)
   .groupBy(tup = tup._3)
   .sortByKey(true)
   .map(g = (g._1, g._2.map(_._2).max))
 res.map(tuple= %s, %d.format(tuple._1,
 tuple._2)).saveAsTextFile(args(1))
 }
 
 
 I would like to test the above code. My unit test is in src/test/scala. The
 code I have so far for the unit test appears below:
 
 import org.apache.spark._
 import org.specs2.mutable._
 
 class GetInfoTest extends Specification with java.io.Serializable{
 
 val data = List (
  (d: {TypeID = 10, Timestamp: 1234, ID: ID1}),
  (d: {TypeID = 11, Timestamp: 5678, ID: ID1}),
  (d: {TypeID = 10, Timestamp: 1357, ID: ID2}),
  (d: {TypeID = 11, Timestamp: 2468, ID: ID2})
)
 
 val expected_out = List(
(ID1,5678),
(ID2,2468),
 )
 
A GetInfo job should {
 //* How do I pass data define above as input and output
 which GetInfo expects as arguments? **
 val sc = new SparkContext(local, GetInfo)
 
 //*** how do I get the output ***
 
  //assuming out_buffer has the output I want to match it to the
 expected output
 match expected output in {
  ( out_buffer == expected_out) must beTrue
 }
 }
 
 }
 
 I would like some help with the tasks marked with  in the unit test
 code above. If specs2 is not the right way to go, I am also open to
 FunSuite. I would like to know how to pass the input while calling my
 program from the unit test and get the output.
 
 Thanks for your help.
 
 
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/guidance-on-simple-unit-testing-with-Spark-tp7604.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.