Re: [Spark on Kubernetes]: Seeking Guidance on Handling Persistent Executor Failures
{str(e)}") # Increment the retry count >> retries += 1 # Sleep time.sleep(60) else: # Break out of the loop if the >> job completes successfully break >> >> HTH >> >> Mich Talebzadeh, >> Dad | Technologist | Solutions Architect | Engineer >> London >> United Kingdom >> >> >>view my Linkedin profile >> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >> >> >> https://en.everybodywiki.com/Mich_Talebzadeh >> >> >> >> *Disclaimer:* The information provided is correct to the best of my >> knowledge but of course cannot be guaranteed . It is essential to note >> that, as with any advice, quote "one test result is worth one-thousand >> expert opinions (Werner >> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun >> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". >> >> >> On Mon, 19 Feb 2024 at 19:21, Mich Talebzadeh >> wrote: >> >>> Not that I am aware of any configuration parameter in Spark classic to >>> limit executor creation. Because of fault tolerance Spark will try to >>> recreate failed executors. Not really that familiar with the Spark operator >>> for k8s. There may be something there. >>> >>> Have you considered custom monitoring and handling within Spark itself >>> using max_retries = 5 etc? >>> >>> HTH >>> >>> HTH >>> >>> Mich Talebzadeh, >>> Dad | Technologist | Solutions Architect | Engineer >>> London >>> United Kingdom >>> >>> >>>view my Linkedin profile >>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> >>> >>> >>> https://en.everybodywiki.com/Mich_Talebzadeh >>> >>> >>> >>> *Disclaimer:* The information provided is correct to the best of my >>> knowledge but of course cannot be guaranteed . It is essential to note >>> that, as with any advice, quote "one test result is worth one-thousand >>> expert opinions (Werner >>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun >>> <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". >>> >>> >>> On Mon, 19 Feb 2024 at 18:34, Sri Potluri wrote: >>> >>>> Hello Spark Community, >>>> >>>> I am currently leveraging Spark on Kubernetes, managed by the Spark >>>> Operator, for running various Spark applications. While the system >>>> generally works well, I've encountered a challenge related to how Spark >>>> applications handle executor failures, specifically in scenarios where >>>> executors enter an error state due to persistent issues. >>>> >>>> *Problem Description* >>>> >>>> When an executor of a Spark application fails, the system attempts to >>>> maintain the desired level of parallelism by automatically recreating a new >>>> executor to replace the failed one. While this behavior is beneficial for >>>> transient errors, ensuring that the application continues to run, it >>>> becomes problematic in cases where the failure is due to a persistent issue >>>> (such as misconfiguration, inaccessible external resources, or incompatible >>>> environment settings). In such scenarios, the application enters a loop, >>>> continuously trying to recreate executors, which leads to resource wastage >>>> and complicates application management. >>>> >>>> *Desired Behavior* >>>> >>>> Ideally, I would like to have a mechanism to limit the number of >>>> retries for executor recreation. If the system fails to successfully create >>>> an executor more than a specified number of times (e.g., 5 attempts), the >>>> entire Spark application should fail and stop trying to recreate the >>>> executor. This behavior would help in efficiently managing resources and >>>> avoiding prolonged failure states. >>>> >>>> *Questions for the Community* >>>> >>>> 1. Is there an existing configuration or method within Spark or the >>>> Spark Operator to limit executor recreation attempts and fail the job after >>>> reaching a threshold? >>>> >>>> 2. Has anyone else encountered similar challenges and found workarounds >>>> or solutions that could be applied in this context? >>>> >>>> >>>> *Additional Context* >>>> >>>> I have explored Spark's task and stage retry configurations >>>> (`spark.task.maxFailures`, `spark.stage.maxConsecutiveAttempts`), but these >>>> do not directly address the issue of limiting executor creation retries. >>>> Implementing a custom monitoring solution to track executor failures and >>>> manually stop the application is a potential workaround, but it would be >>>> preferable to have a more integrated solution. >>>> >>>> I appreciate any guidance, insights, or feedback you can provide on >>>> this matter. >>>> >>>> Thank you for your time and support. >>>> >>>> Best regards, >>>> Sri P >>>> >>> > > -- > Thank you, > Sindhu P. >
Re: [Spark on Kubernetes]: Seeking Guidance on Handling Persistent Executor Failures
Spark has supported the window-based executor failure-tracking mechanism for YARN for a long time, SPARK-41210[1][2] (included in 3.5.0) extended this feature to K8s. [1] https://issues.apache.org/jira/browse/SPARK-41210 [2] https://github.com/apache/spark/pull/38732 Thanks, Cheng Pan > On Feb 19, 2024, at 23:59, Sri Potluri wrote: > > Hello Spark Community, > > I am currently leveraging Spark on Kubernetes, managed by the Spark Operator, > for running various Spark applications. While the system generally works > well, I've encountered a challenge related to how Spark applications handle > executor failures, specifically in scenarios where executors enter an error > state due to persistent issues. > > Problem Description > > When an executor of a Spark application fails, the system attempts to > maintain the desired level of parallelism by automatically recreating a new > executor to replace the failed one. While this behavior is beneficial for > transient errors, ensuring that the application continues to run, it becomes > problematic in cases where the failure is due to a persistent issue (such as > misconfiguration, inaccessible external resources, or incompatible > environment settings). In such scenarios, the application enters a loop, > continuously trying to recreate executors, which leads to resource wastage > and complicates application management. > > Desired Behavior > > Ideally, I would like to have a mechanism to limit the number of retries for > executor recreation. If the system fails to successfully create an executor > more than a specified number of times (e.g., 5 attempts), the entire Spark > application should fail and stop trying to recreate the executor. This > behavior would help in efficiently managing resources and avoiding prolonged > failure states. > > Questions for the Community > > 1. Is there an existing configuration or method within Spark or the Spark > Operator to limit executor recreation attempts and fail the job after > reaching a threshold? > > 2. Has anyone else encountered similar challenges and found workarounds or > solutions that could be applied in this context? > > > Additional Context > > I have explored Spark's task and stage retry configurations > (`spark.task.maxFailures`, `spark.stage.maxConsecutiveAttempts`), but these > do not directly address the issue of limiting executor creation retries. > Implementing a custom monitoring solution to track executor failures and > manually stop the application is a potential workaround, but it would be > preferable to have a more integrated solution. > > I appreciate any guidance, insights, or feedback you can provide on this > matter. > > Thank you for your time and support. > > Best regards, > Sri P - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: [Spark on Kubernetes]: Seeking Guidance on Handling Persistent Executor Failures
Spark applications. While the system >>> generally works well, I've encountered a challenge related to how Spark >>> applications handle executor failures, specifically in scenarios where >>> executors enter an error state due to persistent issues. >>> >>> *Problem Description* >>> >>> When an executor of a Spark application fails, the system attempts to >>> maintain the desired level of parallelism by automatically recreating a new >>> executor to replace the failed one. While this behavior is beneficial for >>> transient errors, ensuring that the application continues to run, it >>> becomes problematic in cases where the failure is due to a persistent issue >>> (such as misconfiguration, inaccessible external resources, or incompatible >>> environment settings). In such scenarios, the application enters a loop, >>> continuously trying to recreate executors, which leads to resource wastage >>> and complicates application management. >>> >>> *Desired Behavior* >>> >>> Ideally, I would like to have a mechanism to limit the number of retries >>> for executor recreation. If the system fails to successfully create an >>> executor more than a specified number of times (e.g., 5 attempts), the >>> entire Spark application should fail and stop trying to recreate the >>> executor. This behavior would help in efficiently managing resources and >>> avoiding prolonged failure states. >>> >>> *Questions for the Community* >>> >>> 1. Is there an existing configuration or method within Spark or the >>> Spark Operator to limit executor recreation attempts and fail the job after >>> reaching a threshold? >>> >>> 2. Has anyone else encountered similar challenges and found workarounds >>> or solutions that could be applied in this context? >>> >>> >>> *Additional Context* >>> >>> I have explored Spark's task and stage retry configurations >>> (`spark.task.maxFailures`, `spark.stage.maxConsecutiveAttempts`), but these >>> do not directly address the issue of limiting executor creation retries. >>> Implementing a custom monitoring solution to track executor failures and >>> manually stop the application is a potential workaround, but it would be >>> preferable to have a more integrated solution. >>> >>> I appreciate any guidance, insights, or feedback you can provide on this >>> matter. >>> >>> Thank you for your time and support. >>> >>> Best regards, >>> Sri P >>> >> -- Thank you, Sindhu P.
Re: [Spark on Kubernetes]: Seeking Guidance on Handling Persistent Executor Failures
Is there an existing configuration or method within Spark or the Spark >> Operator to limit executor recreation attempts and fail the job after >> reaching a threshold? >> >> 2. Has anyone else encountered similar challenges and found workarounds >> or solutions that could be applied in this context? >> >> >> *Additional Context* >> >> I have explored Spark's task and stage retry configurations >> (`spark.task.maxFailures`, `spark.stage.maxConsecutiveAttempts`), but these >> do not directly address the issue of limiting executor creation retries. >> Implementing a custom monitoring solution to track executor failures and >> manually stop the application is a potential workaround, but it would be >> preferable to have a more integrated solution. >> >> I appreciate any guidance, insights, or feedback you can provide on this >> matter. >> >> Thank you for your time and support. >> >> Best regards, >> Sri P >> >
Re: [Spark on Kubernetes]: Seeking Guidance on Handling Persistent Executor Failures
Not that I am aware of any configuration parameter in Spark classic to limit executor creation. Because of fault tolerance Spark will try to recreate failed executors. Not really that familiar with the Spark operator for k8s. There may be something there. Have you considered custom monitoring and handling within Spark itself using max_retries = 5 etc? HTH HTH Mich Talebzadeh, Dad | Technologist | Solutions Architect | Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner <https://en.wikipedia.org/wiki/Wernher_von_Braun>Von Braun <https://en.wikipedia.org/wiki/Wernher_von_Braun>)". On Mon, 19 Feb 2024 at 18:34, Sri Potluri wrote: > Hello Spark Community, > > I am currently leveraging Spark on Kubernetes, managed by the Spark > Operator, for running various Spark applications. While the system > generally works well, I've encountered a challenge related to how Spark > applications handle executor failures, specifically in scenarios where > executors enter an error state due to persistent issues. > > *Problem Description* > > When an executor of a Spark application fails, the system attempts to > maintain the desired level of parallelism by automatically recreating a new > executor to replace the failed one. While this behavior is beneficial for > transient errors, ensuring that the application continues to run, it > becomes problematic in cases where the failure is due to a persistent issue > (such as misconfiguration, inaccessible external resources, or incompatible > environment settings). In such scenarios, the application enters a loop, > continuously trying to recreate executors, which leads to resource wastage > and complicates application management. > > *Desired Behavior* > > Ideally, I would like to have a mechanism to limit the number of retries > for executor recreation. If the system fails to successfully create an > executor more than a specified number of times (e.g., 5 attempts), the > entire Spark application should fail and stop trying to recreate the > executor. This behavior would help in efficiently managing resources and > avoiding prolonged failure states. > > *Questions for the Community* > > 1. Is there an existing configuration or method within Spark or the Spark > Operator to limit executor recreation attempts and fail the job after > reaching a threshold? > > 2. Has anyone else encountered similar challenges and found workarounds or > solutions that could be applied in this context? > > > *Additional Context* > > I have explored Spark's task and stage retry configurations > (`spark.task.maxFailures`, `spark.stage.maxConsecutiveAttempts`), but these > do not directly address the issue of limiting executor creation retries. > Implementing a custom monitoring solution to track executor failures and > manually stop the application is a potential workaround, but it would be > preferable to have a more integrated solution. > > I appreciate any guidance, insights, or feedback you can provide on this > matter. > > Thank you for your time and support. > > Best regards, > Sri P >
[Spark on Kubernetes]: Seeking Guidance on Handling Persistent Executor Failures
Hello Spark Community, I am currently leveraging Spark on Kubernetes, managed by the Spark Operator, for running various Spark applications. While the system generally works well, I've encountered a challenge related to how Spark applications handle executor failures, specifically in scenarios where executors enter an error state due to persistent issues. *Problem Description* When an executor of a Spark application fails, the system attempts to maintain the desired level of parallelism by automatically recreating a new executor to replace the failed one. While this behavior is beneficial for transient errors, ensuring that the application continues to run, it becomes problematic in cases where the failure is due to a persistent issue (such as misconfiguration, inaccessible external resources, or incompatible environment settings). In such scenarios, the application enters a loop, continuously trying to recreate executors, which leads to resource wastage and complicates application management. *Desired Behavior* Ideally, I would like to have a mechanism to limit the number of retries for executor recreation. If the system fails to successfully create an executor more than a specified number of times (e.g., 5 attempts), the entire Spark application should fail and stop trying to recreate the executor. This behavior would help in efficiently managing resources and avoiding prolonged failure states. *Questions for the Community* 1. Is there an existing configuration or method within Spark or the Spark Operator to limit executor recreation attempts and fail the job after reaching a threshold? 2. Has anyone else encountered similar challenges and found workarounds or solutions that could be applied in this context? *Additional Context* I have explored Spark's task and stage retry configurations (`spark.task.maxFailures`, `spark.stage.maxConsecutiveAttempts`), but these do not directly address the issue of limiting executor creation retries. Implementing a custom monitoring solution to track executor failures and manually stop the application is a potential workaround, but it would be preferable to have a more integrated solution. I appreciate any guidance, insights, or feedback you can provide on this matter. Thank you for your time and support. Best regards, Sri P
Re: Seeking Guidance on Spark on Kubernetes Secrets Configuration
Dear Jörn Franke, Jayabindu Singh and Spark Community members, Thank you profoundly for your initial insights. I feel it's necessary to provide more precision on our setup to facilitate a deeper understanding. We're interfacing with S3 Compatible storages, but our operational context is somewhat distinct. Our infrastructure doesn't lean on conventional cloud providers like AWS. Instead, we've architected our environment on On-Premise Kubernetes distributions, specifically k0s and Openshift. Our objective extends beyond just handling S3 keys. We're orchestrating a solution that integrates Azure SPNs, API Credentials, and other sensitive credentials, intending to make Kubernetes' native secrets our central management hub. The aspiration is to have a universally deployable JAR, one that can function unmodified across different ecosystems like EMR, Databricks (on both AWS and Azure), etc. Platforms like Databricks have already made strides in this direction, allowing secrets to be woven directly into the Spark Conf through mechanisms like {{secret_scope/secret_name}}, which are resolved dynamically. The spark-on-k8s-operator's user guide suggests the feasibility of mounting secrets. However, a gap exists in our understanding of how to subsequently access these mounted secret values within the Spark application's context. Here lies my inquiry: is the spark-on-k8s-operator currently equipped to support this level of integration? If it does, any elucidation on the method or best practices would be pivotal for our project. Alternatively, if you could point me to resources or community experts who have tackled similar challenges, it would be of immense assistance. Thank you for bearing with the intricacies of our query, and I appreciate your continued guidance in this endeavor. Warm regards, Jon Rodríguez Aranguren. El sáb, 30 sept 2023 a las 23:19, Jayabindu Singh () escribió: > Hi Jon, > > Using IAM as suggested by Jorn is the best approach. > We recently moved our spark workload from HDP to Spark on K8 and utilizing > IAM. > It will save you from secret management headaches and also allows a lot > more flexibility on access control and option to allow access to multiple > S3 buckets in the same pod. > We have implemented this across Azure, Google and AWS. Azure does require > some extra work to make it work. > > On Sat, Sep 30, 2023 at 12:05 PM Jörn Franke wrote: > >> Don’t use static iam (s3) credentials. It is an outdated insecure method >> - even AWS recommend against using this for anything (cf eg >> https://docs.aws.amazon.com/cli/latest/userguide/cli-authentication-user.html >> ). >> It is almost a guarantee to get your data stolen and your account >> manipulated. >> >> If you need to use kubernetes (which has its own very problematic >> security issues) then assign AWS IAM roles with minimal permissions to the >> pods (for EKS it means using OIDC, cf >> https://docs.aws.amazon.com/eks/latest/userguide/service_IAM_role.html). >> >> Am 30.09.2023 um 03:41 schrieb Jon Rodríguez Aranguren < >> jon.r.arangu...@gmail.com>: >> >> >> Dear Spark Community Members, >> >> I trust this message finds you all in good health and spirits. >> >> I'm reaching out to the collective expertise of this esteemed community >> with a query regarding Spark on Kubernetes. As a newcomer, I have always >> admired the depth and breadth of knowledge shared within this forum, and it >> is my hope that some of you might have insights on a specific challenge I'm >> facing. >> >> I am currently trying to configure multiple Kubernetes secrets, notably >> multiple S3 keys, at the SparkConf level for a Spark application. My >> objective is to understand the best approach or methods to ensure that >> these secrets can be smoothly accessed by the Spark application. >> >> If any of you have previously encountered this scenario or possess >> relevant insights on the matter, your guidance would be highly beneficial. >> >> Thank you for your time and consideration. I'm eager to learn from the >> experiences and knowledge present within this community. >> >> Warm regards, >> Jon >> >>
Re: Seeking Guidance on Spark on Kubernetes Secrets Configuration
There is nowadays more a trend to move away from static credentials/certificates that are stored in a secret vault. The issue is that the rotation of them is complex, once they are leaked they can be abused, making minimal permissions feasible is cumbersome etc. That is why keyless approaches are used for A2A access (workload identity federation was mentioned). E.g. in AWS EKS you would build this on oidc (https://docs.aws.amazon.com/eks/latest/userguide/enable-iam-roles-for-service-accounts.html) and configure this instead of using secrets. Similar approaches exist in other clouds and even on-premise (eg SPIFFE https://spiffe.io/).If this will become the standard will be difficult to say - for sure they seem to more easier to manage.Since you seem to have a Kubernetes setup which means per cloud/data Centre a lot of extra work, infrastructure cost and security issues, workload Identity federation may ease this compared to a secret store.Am 01.10.2023 um 08:27 schrieb Jon Rodríguez Aranguren :Dear Jörn Franke, Jayabindu Singh and Spark Community members,Thank you profoundly for your initial insights. I feel it's necessary to provide more precision on our setup to facilitate a deeper understanding.We're interfacing with S3 Compatible storages, but our operational context is somewhat distinct. Our infrastructure doesn't lean on conventional cloud providers like AWS. Instead, we've architected our environment on On-Premise Kubernetes distributions, specifically k0s and Openshift.Our objective extends beyond just handling S3 keys. We're orchestrating a solution that integrates Azure SPNs, API Credentials, and other sensitive credentials, intending to make Kubernetes' native secrets our central management hub. The aspiration is to have a universally deployable JAR, one that can function unmodified across different ecosystems like EMR, Databricks (on both AWS and Azure), etc. Platforms like Databricks have already made strides in this direction, allowing secrets to be woven directly into the Spark Conf through mechanisms like {{secret_scope/secret_name}}, which are resolved dynamically.The spark-on-k8s-operator's user guide suggests the feasibility of mounting secrets. However, a gap exists in our understanding of how to subsequently access these mounted secret values within the Spark application's context.Here lies my inquiry: is the spark-on-k8s-operator currently equipped to support this level of integration? If it does, any elucidation on the method or best practices would be pivotal for our project. Alternatively, if you could point me to resources or community experts who have tackled similar challenges, it would be of immense assistance.Thank you for bearing with the intricacies of our query, and I appreciate your continued guidance in this endeavor.Warm regards,Jon Rodríguez Aranguren.El sáb, 30 sept 2023 a las 23:19, Jayabindu Singh (<jayabi...@gmail.com>) escribió:Hi Jon,Using IAM as suggested by Jorn is the best approach.We recently moved our spark workload from HDP to Spark on K8 and utilizing IAM.It will save you from secret management headaches and also allows a lot more flexibility on access control and option to allow access to multiple S3 buckets in the same pod. We have implemented this across Azure, Google and AWS. Azure does require some extra work to make it work.On Sat, Sep 30, 2023 at 12:05 PM Jörn Franke <jornfra...@gmail.com> wrote:Don’t use static iam (s3) credentials. It is an outdated insecure method - even AWS recommend against using this for anything (cf eg https://docs.aws.amazon.com/cli/latest/userguide/cli-authentication-user.html).It is almost a guarantee to get your data stolen and your account manipulated. If you need to use kubernetes (which has its own very problematic security issues) then assign AWS IAM roles with minimal permissions to the pods (for EKS it means using OIDC, cf https://docs.aws.amazon.com/eks/latest/userguide/service_IAM_role.html).Am 30.09.2023 um 03:41 schrieb Jon Rodríguez Aranguren <jon.r.arangu...@gmail.com>:Dear Spark Community Members,I trust this message finds you all in good health and spirits.I'm reaching out to the collective expertise of this esteemed community with a query regarding Spark on Kubernetes. As a newcomer, I have always admired the depth and breadth of knowledge shared within this forum, and it is my hope that some of you might have insights on a specific challenge I'm facing.I am currently trying to configure multiple Kubernetes secrets, notably multiple S3 keys, at the SparkConf level for a Spark application. My objective is to understand the best approach or methods to ensure that these secrets can be smoothly accessed by the Spark application.If any of you have previously encountered this scenario or possess relevant insights on the matter, your guidance would be highly beneficial.Thank you for your time and consideration. I'm eager to learn from the experiences and knowledge present within this community.Warm regards,Jon
Re: Seeking Guidance on Spark on Kubernetes Secrets Configuration
With oidc sth comparable is possible: https://docs.aws.amazon.com/eks/latest/userguide/enable-iam-roles-for-service-accounts.htmlAm 01.10.2023 um 11:13 schrieb Mich Talebzadeh :It seems that workload identity is not available on AWS. Workload Identity replaces the need to use Metadata concealment on exposed storage such as s3 and gcs. The sensitive metadata protected by metadata concealment is also protected by Workload Identity.Both Google Cloud Kubernetes (GKE) and Azure Kubernetes Service support Workload Identity. Taking notes from Google Cloud: "Workload Identity is the recommended way for your workloads running on Google Kubernetes Engine (GKE) to access Google Cloud services in a secure and manageable way."HTH Mich Talebzadeh,Distinguished Technologist, Solutions Architect & EngineerLondonUnited Kingdom view my Linkedin profile https://en.everybodywiki.com/Mich_Talebzadeh Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Sun, 1 Oct 2023 at 06:36, Jayabindu Singh <jayabi...@gmail.com> wrote:Hi Jon,Using IAM as suggested by Jorn is the best approach.We recently moved our spark workload from HDP to Spark on K8 and utilizing IAM.It will save you from secret management headaches and also allows a lot more flexibility on access control and option to allow access to multiple S3 buckets in the same pod. We have implemented this across Azure, Google and AWS. Azure does require some extra work to make it work.On Sat, Sep 30, 2023 at 12:05 PM Jörn Franke <jornfra...@gmail.com> wrote:Don’t use static iam (s3) credentials. It is an outdated insecure method - even AWS recommend against using this for anything (cf eg https://docs.aws.amazon.com/cli/latest/userguide/cli-authentication-user.html).It is almost a guarantee to get your data stolen and your account manipulated. If you need to use kubernetes (which has its own very problematic security issues) then assign AWS IAM roles with minimal permissions to the pods (for EKS it means using OIDC, cf https://docs.aws.amazon.com/eks/latest/userguide/service_IAM_role.html).Am 30.09.2023 um 03:41 schrieb Jon Rodríguez Aranguren <jon.r.arangu...@gmail.com>:Dear Spark Community Members,I trust this message finds you all in good health and spirits.I'm reaching out to the collective expertise of this esteemed community with a query regarding Spark on Kubernetes. As a newcomer, I have always admired the depth and breadth of knowledge shared within this forum, and it is my hope that some of you might have insights on a specific challenge I'm facing.I am currently trying to configure multiple Kubernetes secrets, notably multiple S3 keys, at the SparkConf level for a Spark application. My objective is to understand the best approach or methods to ensure that these secrets can be smoothly accessed by the Spark application.If any of you have previously encountered this scenario or possess relevant insights on the matter, your guidance would be highly beneficial.Thank you for your time and consideration. I'm eager to learn from the experiences and knowledge present within this community.Warm regards,Jon
Re: Seeking Guidance on Spark on Kubernetes Secrets Configuration
It seems that workload identity <https://cloud.google.com/iam/docs/workload-identity-federation> is not available on AWS. Workload Identity replaces the need to use Metadata concealment on exposed storage such as s3 and gcs. The sensitive metadata protected by metadata concealment is also protected by Workload Identity. Both Google Cloud Kubernetes (GKE <https://cloud.google.com/kubernetes-engine/docs/how-to/workload-identity>) and Azure Kubernetes Servi <https://learn.microsoft.com/en-us/azure/aks/workload-identity-overview>ce support Workload Identity. Taking notes from Google Cloud: "Workload Identity is the recommended way for your workloads running on Google Kubernetes Engine (GKE) to access Google Cloud services in a secure and manageable way." HTH Mich Talebzadeh, Distinguished Technologist, Solutions Architect & Engineer London United Kingdom view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Sun, 1 Oct 2023 at 06:36, Jayabindu Singh wrote: > Hi Jon, > > Using IAM as suggested by Jorn is the best approach. > We recently moved our spark workload from HDP to Spark on K8 and utilizing > IAM. > It will save you from secret management headaches and also allows a lot > more flexibility on access control and option to allow access to multiple > S3 buckets in the same pod. > We have implemented this across Azure, Google and AWS. Azure does require > some extra work to make it work. > > On Sat, Sep 30, 2023 at 12:05 PM Jörn Franke wrote: > >> Don’t use static iam (s3) credentials. It is an outdated insecure method >> - even AWS recommend against using this for anything (cf eg >> https://docs.aws.amazon.com/cli/latest/userguide/cli-authentication-user.html >> ). >> It is almost a guarantee to get your data stolen and your account >> manipulated. >> >> If you need to use kubernetes (which has its own very problematic >> security issues) then assign AWS IAM roles with minimal permissions to the >> pods (for EKS it means using OIDC, cf >> https://docs.aws.amazon.com/eks/latest/userguide/service_IAM_role.html). >> >> Am 30.09.2023 um 03:41 schrieb Jon Rodríguez Aranguren < >> jon.r.arangu...@gmail.com>: >> >> >> Dear Spark Community Members, >> >> I trust this message finds you all in good health and spirits. >> >> I'm reaching out to the collective expertise of this esteemed community >> with a query regarding Spark on Kubernetes. As a newcomer, I have always >> admired the depth and breadth of knowledge shared within this forum, and it >> is my hope that some of you might have insights on a specific challenge I'm >> facing. >> >> I am currently trying to configure multiple Kubernetes secrets, notably >> multiple S3 keys, at the SparkConf level for a Spark application. My >> objective is to understand the best approach or methods to ensure that >> these secrets can be smoothly accessed by the Spark application. >> >> If any of you have previously encountered this scenario or possess >> relevant insights on the matter, your guidance would be highly beneficial. >> >> Thank you for your time and consideration. I'm eager to learn from the >> experiences and knowledge present within this community. >> >> Warm regards, >> Jon >> >>
Re: Seeking Guidance on Spark on Kubernetes Secrets Configuration
Hi Jon, Using IAM as suggested by Jorn is the best approach. We recently moved our spark workload from HDP to Spark on K8 and utilizing IAM. It will save you from secret management headaches and also allows a lot more flexibility on access control and option to allow access to multiple S3 buckets in the same pod. We have implemented this across Azure, Google and AWS. Azure does require some extra work to make it work. On Sat, Sep 30, 2023 at 12:05 PM Jörn Franke wrote: > Don’t use static iam (s3) credentials. It is an outdated insecure method - > even AWS recommend against using this for anything (cf eg > https://docs.aws.amazon.com/cli/latest/userguide/cli-authentication-user.html > ). > It is almost a guarantee to get your data stolen and your account > manipulated. > > If you need to use kubernetes (which has its own very problematic security > issues) then assign AWS IAM roles with minimal permissions to the pods (for > EKS it means using OIDC, cf > https://docs.aws.amazon.com/eks/latest/userguide/service_IAM_role.html). > > Am 30.09.2023 um 03:41 schrieb Jon Rodríguez Aranguren < > jon.r.arangu...@gmail.com>: > > > Dear Spark Community Members, > > I trust this message finds you all in good health and spirits. > > I'm reaching out to the collective expertise of this esteemed community > with a query regarding Spark on Kubernetes. As a newcomer, I have always > admired the depth and breadth of knowledge shared within this forum, and it > is my hope that some of you might have insights on a specific challenge I'm > facing. > > I am currently trying to configure multiple Kubernetes secrets, notably > multiple S3 keys, at the SparkConf level for a Spark application. My > objective is to understand the best approach or methods to ensure that > these secrets can be smoothly accessed by the Spark application. > > If any of you have previously encountered this scenario or possess > relevant insights on the matter, your guidance would be highly beneficial. > > Thank you for your time and consideration. I'm eager to learn from the > experiences and knowledge present within this community. > > Warm regards, > Jon > >
Re: Seeking Guidance on Spark on Kubernetes Secrets Configuration
Don’t use static iam (s3) credentials. It is an outdated insecure method - even AWS recommend against using this for anything (cf eg https://docs.aws.amazon.com/cli/latest/userguide/cli-authentication-user.html). It is almost a guarantee to get your data stolen and your account manipulated. If you need to use kubernetes (which has its own very problematic security issues) then assign AWS IAM roles with minimal permissions to the pods (for EKS it means using OIDC, cf https://docs.aws.amazon.com/eks/latest/userguide/service_IAM_role.html). > Am 30.09.2023 um 03:41 schrieb Jon Rodríguez Aranguren > : > > > Dear Spark Community Members, > > I trust this message finds you all in good health and spirits. > > I'm reaching out to the collective expertise of this esteemed community with > a query regarding Spark on Kubernetes. As a newcomer, I have always admired > the depth and breadth of knowledge shared within this forum, and it is my > hope that some of you might have insights on a specific challenge I'm facing. > > I am currently trying to configure multiple Kubernetes secrets, notably > multiple S3 keys, at the SparkConf level for a Spark application. My > objective is to understand the best approach or methods to ensure that these > secrets can be smoothly accessed by the Spark application. > > If any of you have previously encountered this scenario or possess relevant > insights on the matter, your guidance would be highly beneficial. > > Thank you for your time and consideration. I'm eager to learn from the > experiences and knowledge present within this community. > > Warm regards, > Jon
Seeking Guidance on Spark on Kubernetes Secrets Configuration
Dear Spark Community Members, I trust this message finds you all in good health and spirits. I'm reaching out to the collective expertise of this esteemed community with a query regarding Spark on Kubernetes. As a newcomer, I have always admired the depth and breadth of knowledge shared within this forum, and it is my hope that some of you might have insights on a specific challenge I'm facing. I am currently trying to configure multiple Kubernetes secrets, notably multiple S3 keys, at the SparkConf level for a Spark application. My objective is to understand the best approach or methods to ensure that these secrets can be smoothly accessed by the Spark application. If any of you have previously encountered this scenario or possess relevant insights on the matter, your guidance would be highly beneficial. Thank you for your time and consideration. I'm eager to learn from the experiences and knowledge present within this community. Warm regards, Jon
Re: Urgent: Seeking Guidance on Kafka Slow Consumer and Data Skew Problem
Hi All, It will be helpful if we gave any pointers to the problem addressed. Thanks Karthick. On Wed, Sep 20, 2023 at 3:03 PM Gowtham S wrote: > Hi Spark Community, > > Thank you for bringing up this issue. We've also encountered the same > challenge and are actively working on finding a solution. It's reassuring > to know that we're not alone in this. > > If you have any insights or suggestions regarding how to address this > problem, please feel free to share them. > > Looking forward to hearing from others who might have encountered similar > issues. > > > Thanks and regards, > Gowtham S > > > On Tue, 19 Sept 2023 at 17:23, Karthick > wrote: > >> Subject: Seeking Guidance on Kafka Slow Consumer and Data Skew Problem >> >> Dear Spark Community, >> >> I recently reached out to the Apache Flink community for assistance with >> a critical issue we are facing in our IoT platform, which relies on Apache >> Kafka and real-time data processing. We received some valuable insights and >> suggestions from the Apache Flink community, and now, we would like to seek >> your expertise and guidance on the same problem. >> >> In our IoT ecosystem, we are dealing with data streams from thousands of >> devices, each uniquely identified. To maintain data integrity and ordering, >> we have configured a Kafka topic with ten partitions, ensuring that each >> device's data is directed to its respective partition based on its unique >> identifier. While this architectural choice has been effective in >> maintaining data order, it has unveiled a significant challenge: >> >> *Slow Consumer and Data Skew Problem:* When a single device experiences >> processing delays, it acts as a bottleneck within the Kafka partition, >> leading to delays in processing data from other devices sharing the same >> partition. This issue severely affects the efficiency and scalability of >> our entire data processing pipeline. >> >> Here are some key details: >> >> - Number of Devices: 1000 (with potential growth) >> - Target Message Rate: 1000 messages per second (with expected growth) >> - Kafka Partitions: 10 (some partitions are overloaded) >> - We are planning to migrate from Apache Storm to Apache Flink/Spark. >> >> We are actively seeking guidance on the following aspects: >> >> *1. Independent Device Data Processing*: We require a strategy that >> guarantees one device's processing speed does not affect other devices in >> the same Kafka partition. In other words, we need a solution that ensures >> the independent processing of each device's data. >> >> *2. Custom Partitioning Strategy:* We are looking for a custom >> partitioning strategy to distribute the load evenly across Kafka >> partitions. Currently, we are using Murmur hashing with the device's unique >> identifier, but we are open to exploring alternative partitioning >> strategies. >> >> *3. Determining Kafka Partition Count:* We seek guidance on how to >> determine the optimal number of Kafka partitions to handle the target >> message rate efficiently. >> >> *4. Handling Data Skew:* Strategies or techniques for handling data skew >> within Apache Flink. >> >> We believe that many in your community may have faced similar challenges >> or possess valuable insights into addressing them. Your expertise and >> experiences can greatly benefit our team and the broader community dealing >> with real-time data processing. >> >> If you have any knowledge, solutions, or references to open-source >> projects, libraries, or community-contributed solutions that align with our >> requirements, we would be immensely grateful for your input. >> >> We appreciate your prompt attention to this matter and eagerly await your >> responses and insights. Your support will be invaluable in helping us >> overcome this critical challenge. >> >> Thank you for your time and consideration. >> >> Thanks & regards, >> Karthick. >> >
Re: Urgent: Seeking Guidance on Kafka Slow Consumer and Data Skew Problem
Hi Spark Community, Thank you for bringing up this issue. We've also encountered the same challenge and are actively working on finding a solution. It's reassuring to know that we're not alone in this. If you have any insights or suggestions regarding how to address this problem, please feel free to share them. Looking forward to hearing from others who might have encountered similar issues. Thanks and regards, Gowtham S On Tue, 19 Sept 2023 at 17:23, Karthick wrote: > Subject: Seeking Guidance on Kafka Slow Consumer and Data Skew Problem > > Dear Spark Community, > > I recently reached out to the Apache Flink community for assistance with a > critical issue we are facing in our IoT platform, which relies on Apache > Kafka and real-time data processing. We received some valuable insights and > suggestions from the Apache Flink community, and now, we would like to seek > your expertise and guidance on the same problem. > > In our IoT ecosystem, we are dealing with data streams from thousands of > devices, each uniquely identified. To maintain data integrity and ordering, > we have configured a Kafka topic with ten partitions, ensuring that each > device's data is directed to its respective partition based on its unique > identifier. While this architectural choice has been effective in > maintaining data order, it has unveiled a significant challenge: > > *Slow Consumer and Data Skew Problem:* When a single device experiences > processing delays, it acts as a bottleneck within the Kafka partition, > leading to delays in processing data from other devices sharing the same > partition. This issue severely affects the efficiency and scalability of > our entire data processing pipeline. > > Here are some key details: > > - Number of Devices: 1000 (with potential growth) > - Target Message Rate: 1000 messages per second (with expected growth) > - Kafka Partitions: 10 (some partitions are overloaded) > - We are planning to migrate from Apache Storm to Apache Flink/Spark. > > We are actively seeking guidance on the following aspects: > > *1. Independent Device Data Processing*: We require a strategy that > guarantees one device's processing speed does not affect other devices in > the same Kafka partition. In other words, we need a solution that ensures > the independent processing of each device's data. > > *2. Custom Partitioning Strategy:* We are looking for a custom > partitioning strategy to distribute the load evenly across Kafka > partitions. Currently, we are using Murmur hashing with the device's unique > identifier, but we are open to exploring alternative partitioning > strategies. > > *3. Determining Kafka Partition Count:* We seek guidance on how to > determine the optimal number of Kafka partitions to handle the target > message rate efficiently. > > *4. Handling Data Skew:* Strategies or techniques for handling data skew > within Apache Flink. > > We believe that many in your community may have faced similar challenges > or possess valuable insights into addressing them. Your expertise and > experiences can greatly benefit our team and the broader community dealing > with real-time data processing. > > If you have any knowledge, solutions, or references to open-source > projects, libraries, or community-contributed solutions that align with our > requirements, we would be immensely grateful for your input. > > We appreciate your prompt attention to this matter and eagerly await your > responses and insights. Your support will be invaluable in helping us > overcome this critical challenge. > > Thank you for your time and consideration. > > Thanks & regards, > Karthick. >
Urgent: Seeking Guidance on Kafka Slow Consumer and Data Skew Problem
Subject: Seeking Guidance on Kafka Slow Consumer and Data Skew Problem Dear Spark Community, I recently reached out to the Apache Flink community for assistance with a critical issue we are facing in our IoT platform, which relies on Apache Kafka and real-time data processing. We received some valuable insights and suggestions from the Apache Flink community, and now, we would like to seek your expertise and guidance on the same problem. In our IoT ecosystem, we are dealing with data streams from thousands of devices, each uniquely identified. To maintain data integrity and ordering, we have configured a Kafka topic with ten partitions, ensuring that each device's data is directed to its respective partition based on its unique identifier. While this architectural choice has been effective in maintaining data order, it has unveiled a significant challenge: *Slow Consumer and Data Skew Problem:* When a single device experiences processing delays, it acts as a bottleneck within the Kafka partition, leading to delays in processing data from other devices sharing the same partition. This issue severely affects the efficiency and scalability of our entire data processing pipeline. Here are some key details: - Number of Devices: 1000 (with potential growth) - Target Message Rate: 1000 messages per second (with expected growth) - Kafka Partitions: 10 (some partitions are overloaded) - We are planning to migrate from Apache Storm to Apache Flink/Spark. We are actively seeking guidance on the following aspects: *1. Independent Device Data Processing*: We require a strategy that guarantees one device's processing speed does not affect other devices in the same Kafka partition. In other words, we need a solution that ensures the independent processing of each device's data. *2. Custom Partitioning Strategy:* We are looking for a custom partitioning strategy to distribute the load evenly across Kafka partitions. Currently, we are using Murmur hashing with the device's unique identifier, but we are open to exploring alternative partitioning strategies. *3. Determining Kafka Partition Count:* We seek guidance on how to determine the optimal number of Kafka partitions to handle the target message rate efficiently. *4. Handling Data Skew:* Strategies or techniques for handling data skew within Apache Flink. We believe that many in your community may have faced similar challenges or possess valuable insights into addressing them. Your expertise and experiences can greatly benefit our team and the broader community dealing with real-time data processing. If you have any knowledge, solutions, or references to open-source projects, libraries, or community-contributed solutions that align with our requirements, we would be immensely grateful for your input. We appreciate your prompt attention to this matter and eagerly await your responses and insights. Your support will be invaluable in helping us overcome this critical challenge. Thank you for your time and consideration. Thanks & regards, Karthick.
Fwd: Wednesday: Join 6 Members at "Ofir Press | Complementing Scale: Novel Guidance Methods for Improving LMs"
They recently combined Apache Spark and AI meeting in London. An online session worth attending for some? HTH Mich view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> https://en.everybodywiki.com/Mich_Talebzadeh *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. -- Forwarded message - From: Apache Spark+AI London Date: Thu, 24 Aug 2023 at 20:01 Subject: Wednesday: Join 6 Members at "Ofir Press | Complementing Scale: Novel Guidance Methods for Improving LMs" To: Apache Spark+AI London invites you to keep connecting [image: Meetup] <http://clicks.meetup.com/ls/click?upn=XbaZ37larFA-2FuV5MohrYpdrra25MtI4CzodbRR1Rd1lMbY-2F0BB6sl1-2BGb-2BU4Xb61IYOfzSGDr1WLA6MC9Dyq9-2F8g3ZHmsfIhK4vjh2BPLpwsD6Iy-2BmFCfGEumnmGhsB4xvvr0JsqTn3JcdmaNJdQmw-3D-3DkYao_Y1imAGhe9PFOX0yIXfkkZuRXJYXWpMyeZZtnEsE9vxhYJX7At9WbASZx-2B7Yno7jp7lnd8WbITbo-2BTZqywe2J-2FOVAP8gA3GEo9TaOqm8R1FsqAUHxZPKlCWGK7DKbejoOip5UXg2HjAB-2BKZFitkCvgCbNsCCMG9ZelHebLF-2B8BFSNxbtmKCoDQx2WU3hXML1SeBBPLDxyH-2FpSy7BaPGh3leetGFi6STd-2FJMTu0pi3sCSizn2JmYBppBaIS6-2BxjzZCEGMhDuHoiFE3Fyh0GjgJvLUsI5Iht2-2BpH8-2F9uGpot6nBeTitT89H2-2FhresQGyt7-2F-2FgBJKaSA5K-2F-2Ffwtz5KJR2mKDzJKyEkj4VvdsKiX2Qimotni6SCIDcPK5m2qsM13-2BUZg1LGS9HrA8FWi7QlW6DM3XqxUDY5CmOmZJ6p2d2wcc5eWpbgK-2FVIyI1RZd2DiNLnecv8n6qrFPhIj0-2BRw0YocCmvTLh-2F1NQ0CrUneizlKxHazkUQVUXLYQmKSkUvH0iU4h7btTvVG4geaHk3OEidSoQDPr1FqX-2BNsjF-2FYGaRDKZ69BaMcUATndkC01-2BCfW2fvgbLcI72o-2F1g3fD6fb-2BBXxrd5sJe8-2FIfvvRjGV9t-2B2AMnK95w3fYUFD0qWurS4VXKI-2FuW0Odlu-2BIIuv-2FGZszgnBMX0yvJbWDLhXbbNfwKdROIWzAvBZ6-2FLsq98UNs9zJicpYRqvtlLJVDWSPdv1QPab4KHYXjzTIqAMSYsn2WuigIzuZl1sysSy01kkItc> <http://clicks.meetup.com/ls/click?upn=XbaZ37larFA-2FuV5MohrYpdrra25MtI4CzodbRR1Rd1lucdvIwHwxpqyRpUJ2cLgrLceiLidYHikYKFDLUotqnKr1wFOtdTEoK-2Fs3Zb-2F-2BDHS-2B0ciqgGMn9AB3y5MCYgQP7rJpQ2SSPlU3-2FyOWUDwFyg-3D-3DFQUx_Y1imAGhe9PFOX0yIXfkkZuRXJYXWpMyeZZtnEsE9vxhYJX7At9WbASZx-2B7Yno7jp7lnd8WbITbo-2BTZqywe2J-2FOVAP8gA3GEo9TaOqm8R1FsqAUHxZPKlCWGK7DKbejoOip5UXg2HjAB-2BKZFitkCvgCbNsCCMG9ZelHebLF-2B8BFSNxbtmKCoDQx2WU3hXML1SeBBPLDxyH-2FpSy7BaPGh3leetGFi6STd-2FJMTu0pi3sCSizn2JmYBppBaIS6-2BxjzZCEGMhDuHoiFE3Fyh0GjgJvLUsI5Iht2-2BpH8-2F9uGpot6nBeTitT89H2-2FhresQGyt7-2F-2FgBJKaSA5K-2F-2Ffwtz5KJR2mKDzJKyEkj4VvdsKiX2Qimotni6SCIDcPK5m2qsM13-2BUZg1LGS9HrA8FWi7QlW6DM3XqxUDY5CmOmZJ6p2d2wcc5eWpbgK-2FVIyI1RZd2DiNLnecv8n6qrFPhIj0-2BRw0YocCmvTLh-2F1NQ0CrUneizlKxHazkUQVUXLYQmKSkUvH0iU4h7btTvVG4geaHk3OEidSoQDPr1FqX-2BNsjF-2FYGaRDKZ69BaMcUATndkC01-2BCfW2fvgbLcI72o-2F1g3fD6fb-2BBXxrd5sJe8-2FIfvvRjGV9t9i-2FiXfprAFg-2FW4xtYRoY76DxLmHyNuKXC822GetT3wPkc2Oprz147XLurkQtD7l5YS4cYTvbPPvmE8gUUZTX-2BYoD4yLCdnnJ2KNOs2VUkxwNEVURPEjh8L-2BAYGG48siWFtxanIArixv9zGFMHY8fic> You are 1 RSVP away from 5 RSVPs <http://clicks.meetup.com/ls/click?upn=XbaZ37larFA-2FuV5MohrYpdrra25MtI4CzodbRR1Rd1lucdvIwHwxpqyRpUJ2cLgrLceiLidYHikYKFDLUotqnKr1wFOtdTEoK-2Fs3Zb-2F-2BDHS-2B0ciqgGMn9AB3y5MCYgQP7rJpQ2SSPlU3-2FyOWUDwFyg-3D-3D8PAt_Y1imAGhe9PFOX0yIXfkkZuRXJYXWpMyeZZtnEsE9vxhYJX7At9WbASZx-2B7Yno7jp7lnd8WbITbo-2BTZqywe2J-2FOVAP8gA3GEo9TaOqm8R1FsqAUHxZPKlCWGK7DKbejoOip5UXg2HjAB-2BKZFitkCvgCbNsCCMG9ZelHebLF-2B8BFSNxbtmKCoDQx2WU3hXML1SeBBPLDxyH-2FpSy7BaPGh3leetGFi6STd-2FJMTu0pi3sCSizn2JmYBppBaIS6-2BxjzZCEGMhDuHoiFE3Fyh0GjgJvLUsI5Iht2-2BpH8-2F9uGpot6nBeTitT89H2-2FhresQGyt7-2F-2FgBJKaSA5K-2F-2Ffwtz5KJR2mKDzJKyEkj4VvdsKiX2Qimotni6SCIDcPK5m2qsM13-2BUZg1LGS9HrA8FWi7QlW6DM3XqxUDY5CmOmZJ6p2d2wcc5eWpbgK-2FVIyI1RZd2DiNLnecv8n6qrFPhIj0-2BRw0YocCmvTLh-2F1NQ0CrUneizlKxHazkUQVUXLYQmKSkUvH0iU4h7btTvVG4geaHk3OEidSoQDPr1FqX-2BNsjF-2FYGaRDKZ69BaMcUATndkC01-2BCfW2fvgbLcI72o-2F1g3fD6fb-2BBXxrd5sJe8-2FIfvvRjGV9t-2FLJ6R5UpfuK5-2BN8B0Uiv9wFHzmFleJajLBFbkk-2F8cuurWARcGAEkYss7S9t16MqEQZh6hECYoe1OYYOnKQrg2w9Daqo2E8nE8pm5NAV07nCBQpGycU2ynA6pgaX8WASznbxwksHUirzhaKu9ndg24g> Wednesday Ofir Press | Complementing Scale: Novel Guidance Methods for Improving LMs <http://clicks.meetup.com/ls/click?upn=XbaZ37larFA-2FuV5MohrYpdrra25MtI4CzodbRR1Rd1miCKx60eDFevCrdo8wt3KCFjcAnYhNkBulr7Qec2c-2Fmg-3D-3DNa8f_Y1imAGhe9PFOX0yIXfkkZuRXJYXWpMyeZZtnEsE9vxhYJX7At9WbASZx-2B7Yno7jp7lnd8WbITbo-2BTZqywe2J-2FOVAP8gA3GEo9TaOqm8R1FsqAUHxZPKlCWGK7DKbejoOip5UXg2HjAB-2BKZFitkCvgCbNsCCMG9ZelHebLF-2B8BFSNxbtmKCoDQx2WU3hXML1SeBBPLDxyH-2FpSy7BaPGh3leetGFi6STd-2FJMTu0pi3sCSizn2JmYBppBaIS6-2BxjzZCEGMhDuHoiFE3Fyh0GjgJvLUsI5Iht2-2BpH8-2F9uGpot6nBeTitT89H2-2FhresQGyt7-2F-2FgBJKaSA5K-2F-2Ffwtz5KJR2mKDzJKyEkj4VvdsKiX2Qimotni6SCIDcPK5m2qsM13-2BUZg1LGS9HrA8FWi7QlW6DM3XqxUDY5CmOmZJ6p2d2wcc5eWpbgK-2FVIyI1RZd2DiNLnecv8n6qrFPhIj0-2BRw0YocCmvTLh-2F1NQ0CrUneizlKxHazkUQVUXLYQmKSkUvH0iU4h7btTvVG4geaHk3OEidSoQDPr1FqX-2BNsjF-2FYGaRDKZ69BaMcUATndkC01-2BCfW2fvgbLcI72o-2
Guidance
Hi everyone, I want to ask for guidance for my log analyzer platform idea. I have an elasticsearch system which collects the logs from different platforms, and creates alerts. The system writes the alerts to an index on ES. Also, my alerts are stored in a folder as JSON (multi line format). The Goals: 1. Read json folder or ES index as streaming (read in new entry within 5 min) 2. Select only alerts that I want to work on ( alert.id = 100 , status=true , ...) 3. Create a DataFrame + Window for 10 min period 4. Run a query fro that DataFrame by grupping by IP ( If same IP gets 3 alerts then show me the result) 5. All the coding should be in python The ideas is something like this, my question is how should I proceed to this task. What are the technologies that I should use? *Apache Spark + Python + Pyspark + Kaola *can handle this ? -- Best regards, *Suat Toksoz*
Re: Need some guidance
Thanks, yes. I was using Int for my V and didn't get the second param in the second closure right :) On Mon, Apr 13, 2015 at 1:55 PM, Dean Wampler deanwamp...@gmail.com wrote: That appears to work, with a few changes to get the types correct: input.distinct().combineByKey((s: String) = 1, (agg: Int, s: String) = agg + 1, (agg1: Int, agg2: Int) = agg1 + agg2) dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Apr 13, 2015 at 3:24 PM, Victor Tso-Guillen v...@paxata.com wrote: How about this? input.distinct().combineByKey((v: V) = 1, (agg: Int, x: Int) = agg + 1, (agg1: Int, agg2: Int) = agg1 + agg2).collect() On Mon, Apr 13, 2015 at 10:31 AM, Dean Wampler deanwamp...@gmail.com wrote: The problem with using collect is that it will fail for large data sets, as you'll attempt to copy the entire RDD to the memory of your driver program. The following works (Scala syntax, but similar to Python): scala val i1 = input.distinct.groupByKey scala i1.foreach(println) (1,CompactBuffer(beta, alpha, foo)) (3,CompactBuffer(foo)) (2,CompactBuffer(alpha, bar)) scala val i2 = i1.map(tup = (tup._1, tup._2.size)) scala i1.foreach(println) (1,3) (3,1) (2,2) The i2 line passes a function that takes a tuple argument, then constructs a new output tuple with the first element and the size of the second (each CompactBuffer). An alternative pattern match syntax would be. scala val i2 = i1.map { case (key, buffer) = (key, buffer.size) } This should work as long as none of the CompactBuffers are too large, which could happen for extremely large data sets. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Apr 13, 2015 at 11:45 AM, Marco Shaw marco.s...@gmail.com wrote: **Learning the ropes** I'm trying to grasp the concept of using the pipeline in pySpark... Simplified example: list=[(1,alpha),(1,beta),(1,foo),(1,alpha),(2,alpha),(2,alpha),(2,bar),(3,foo)] Desired outcome: [(1,3),(2,2),(3,1)] Basically for each key, I want the number of unique values. I've tried different approaches, but am I really using Spark effectively? I wondered if I would do something like: input=sc.parallelize(list) input.groupByKey().collect() Then I wondered if I could do something like a foreach over each key value, and then map the actual values and reduce them. Pseudo-code: input.groupbykey() .keys .foreach(_.values .map(lambda x: x,1) .reducebykey(lambda a,b:a+b) .count() ) I was somehow hoping that the key would get the current value of count, and thus be the count of the unique keys, which is exactly what I think I'm looking for. Am I way off base on how I could accomplish this? Marco
Re: Need some guidance
That appears to work, with a few changes to get the types correct: input.distinct().combineByKey((s: String) = 1, (agg: Int, s: String) = agg + 1, (agg1: Int, agg2: Int) = agg1 + agg2) dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Apr 13, 2015 at 3:24 PM, Victor Tso-Guillen v...@paxata.com wrote: How about this? input.distinct().combineByKey((v: V) = 1, (agg: Int, x: Int) = agg + 1, (agg1: Int, agg2: Int) = agg1 + agg2).collect() On Mon, Apr 13, 2015 at 10:31 AM, Dean Wampler deanwamp...@gmail.com wrote: The problem with using collect is that it will fail for large data sets, as you'll attempt to copy the entire RDD to the memory of your driver program. The following works (Scala syntax, but similar to Python): scala val i1 = input.distinct.groupByKey scala i1.foreach(println) (1,CompactBuffer(beta, alpha, foo)) (3,CompactBuffer(foo)) (2,CompactBuffer(alpha, bar)) scala val i2 = i1.map(tup = (tup._1, tup._2.size)) scala i1.foreach(println) (1,3) (3,1) (2,2) The i2 line passes a function that takes a tuple argument, then constructs a new output tuple with the first element and the size of the second (each CompactBuffer). An alternative pattern match syntax would be. scala val i2 = i1.map { case (key, buffer) = (key, buffer.size) } This should work as long as none of the CompactBuffers are too large, which could happen for extremely large data sets. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Apr 13, 2015 at 11:45 AM, Marco Shaw marco.s...@gmail.com wrote: **Learning the ropes** I'm trying to grasp the concept of using the pipeline in pySpark... Simplified example: list=[(1,alpha),(1,beta),(1,foo),(1,alpha),(2,alpha),(2,alpha),(2,bar),(3,foo)] Desired outcome: [(1,3),(2,2),(3,1)] Basically for each key, I want the number of unique values. I've tried different approaches, but am I really using Spark effectively? I wondered if I would do something like: input=sc.parallelize(list) input.groupByKey().collect() Then I wondered if I could do something like a foreach over each key value, and then map the actual values and reduce them. Pseudo-code: input.groupbykey() .keys .foreach(_.values .map(lambda x: x,1) .reducebykey(lambda a,b:a+b) .count() ) I was somehow hoping that the key would get the current value of count, and thus be the count of the unique keys, which is exactly what I think I'm looking for. Am I way off base on how I could accomplish this? Marco
Re: Need some guidance
How about this? input.distinct().combineByKey((v: V) = 1, (agg: Int, x: Int) = agg + 1, (agg1: Int, agg2: Int) = agg1 + agg2).collect() On Mon, Apr 13, 2015 at 10:31 AM, Dean Wampler deanwamp...@gmail.com wrote: The problem with using collect is that it will fail for large data sets, as you'll attempt to copy the entire RDD to the memory of your driver program. The following works (Scala syntax, but similar to Python): scala val i1 = input.distinct.groupByKey scala i1.foreach(println) (1,CompactBuffer(beta, alpha, foo)) (3,CompactBuffer(foo)) (2,CompactBuffer(alpha, bar)) scala val i2 = i1.map(tup = (tup._1, tup._2.size)) scala i1.foreach(println) (1,3) (3,1) (2,2) The i2 line passes a function that takes a tuple argument, then constructs a new output tuple with the first element and the size of the second (each CompactBuffer). An alternative pattern match syntax would be. scala val i2 = i1.map { case (key, buffer) = (key, buffer.size) } This should work as long as none of the CompactBuffers are too large, which could happen for extremely large data sets. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Apr 13, 2015 at 11:45 AM, Marco Shaw marco.s...@gmail.com wrote: **Learning the ropes** I'm trying to grasp the concept of using the pipeline in pySpark... Simplified example: list=[(1,alpha),(1,beta),(1,foo),(1,alpha),(2,alpha),(2,alpha),(2,bar),(3,foo)] Desired outcome: [(1,3),(2,2),(3,1)] Basically for each key, I want the number of unique values. I've tried different approaches, but am I really using Spark effectively? I wondered if I would do something like: input=sc.parallelize(list) input.groupByKey().collect() Then I wondered if I could do something like a foreach over each key value, and then map the actual values and reduce them. Pseudo-code: input.groupbykey() .keys .foreach(_.values .map(lambda x: x,1) .reducebykey(lambda a,b:a+b) .count() ) I was somehow hoping that the key would get the current value of count, and thus be the count of the unique keys, which is exactly what I think I'm looking for. Am I way off base on how I could accomplish this? Marco
Re: Need some guidance
The problem with using collect is that it will fail for large data sets, as you'll attempt to copy the entire RDD to the memory of your driver program. The following works (Scala syntax, but similar to Python): scala val i1 = input.distinct.groupByKey scala i1.foreach(println) (1,CompactBuffer(beta, alpha, foo)) (3,CompactBuffer(foo)) (2,CompactBuffer(alpha, bar)) scala val i2 = i1.map(tup = (tup._1, tup._2.size)) scala i1.foreach(println) (1,3) (3,1) (2,2) The i2 line passes a function that takes a tuple argument, then constructs a new output tuple with the first element and the size of the second (each CompactBuffer). An alternative pattern match syntax would be. scala val i2 = i1.map { case (key, buffer) = (key, buffer.size) } This should work as long as none of the CompactBuffers are too large, which could happen for extremely large data sets. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Mon, Apr 13, 2015 at 11:45 AM, Marco Shaw marco.s...@gmail.com wrote: **Learning the ropes** I'm trying to grasp the concept of using the pipeline in pySpark... Simplified example: list=[(1,alpha),(1,beta),(1,foo),(1,alpha),(2,alpha),(2,alpha),(2,bar),(3,foo)] Desired outcome: [(1,3),(2,2),(3,1)] Basically for each key, I want the number of unique values. I've tried different approaches, but am I really using Spark effectively? I wondered if I would do something like: input=sc.parallelize(list) input.groupByKey().collect() Then I wondered if I could do something like a foreach over each key value, and then map the actual values and reduce them. Pseudo-code: input.groupbykey() .keys .foreach(_.values .map(lambda x: x,1) .reducebykey(lambda a,b:a+b) .count() ) I was somehow hoping that the key would get the current value of count, and thus be the count of the unique keys, which is exactly what I think I'm looking for. Am I way off base on how I could accomplish this? Marco
Need some guidance
**Learning the ropes** I'm trying to grasp the concept of using the pipeline in pySpark... Simplified example: list=[(1,alpha),(1,beta),(1,foo),(1,alpha),(2,alpha),(2,alpha),(2,bar),(3,foo)] Desired outcome: [(1,3),(2,2),(3,1)] Basically for each key, I want the number of unique values. I've tried different approaches, but am I really using Spark effectively? I wondered if I would do something like: input=sc.parallelize(list) input.groupByKey().collect() Then I wondered if I could do something like a foreach over each key value, and then map the actual values and reduce them. Pseudo-code: input.groupbykey() .keys .foreach(_.values .map(lambda x: x,1) .reducebykey(lambda a,b:a+b) .count() ) I was somehow hoping that the key would get the current value of count, and thus be the count of the unique keys, which is exactly what I think I'm looking for. Am I way off base on how I could accomplish this? Marco
Re: guidance on simple unit testing with Spark
If you don't want to refactor your code, you can put your input into a test file. After the test runs, read the data from the output file you specified (probably want this to be a temp file and delete on exit). Of course, that is not really a unit test - Metei's suggestion is preferable (this is how we test). However, if you have a long and complex flow, you might unit test different parts, and then have an integration test which reads from the files and tests the whole flow together (I do this as well). On Fri, Jun 13, 2014 at 10:04 PM, Matei Zaharia matei.zaha...@gmail.com wrote: You need to factor your program so that it’s not just a main(). This is not a Spark-specific issue, it’s about how you’d unit test any program in general. In this case, your main() creates a SparkContext, so you can’t pass one from outside, and your code has to read data from a file and write it to a file. It would be better to move your code for transforming data into a new function: def processData(lines: RDD[String]): RDD[String] = { // build and return your “res” variable } Then you can unit-test this directly on data you create in your program: val myLines = sc.parallelize(Seq(“line 1”, “line 2”)) val result = GetInfo.processData(myLines).collect() assert(result.toSet === Set(“res 1”, “res 2”)) Matei On Jun 13, 2014, at 2:42 PM, SK skrishna...@gmail.com wrote: Hi, I have looked through some of the test examples and also the brief documentation on unit testing at http://spark.apache.org/docs/latest/programming-guide.html#unit-testing, but still dont have a good understanding of writing unit tests using the Spark framework. Previously, I have written unit tests using specs2 framework and have got them to work in Scalding. I tried to use the specs2 framework with Spark, but could not find any simple examples I could follow. I am open to specs2 or Funsuite, whichever works best with Spark. I would like some additional guidance, or some simple sample code using specs2 or Funsuite. My code is provided below. I have the following code in src/main/scala/GetInfo.scala. It reads a Json file and extracts some data. It takes the input file (args(0)) and output file (args(1)) as arguments. object GetInfo{ def main(args: Array[String]) { val inp_file = args(0) val conf = new SparkConf().setAppName(GetInfo) val sc = new SparkContext(conf) val res = sc.textFile(log_file) .map(line = { parse(line) }) .map(json = { implicit lazy val formats = org.json4s.DefaultFormats val aid = (json \ d \ TypeID).extract[Int] val ts = (json \ d \ TimeStamp).extract[Long] val gid = (json \ d \ ID).extract[String] (aid, ts, gid) } ) .groupBy(tup = tup._3) .sortByKey(true) .map(g = (g._1, g._2.map(_._2).max)) res.map(tuple= %s, %d.format(tuple._1, tuple._2)).saveAsTextFile(args(1)) } I would like to test the above code. My unit test is in src/test/scala. The code I have so far for the unit test appears below: import org.apache.spark._ import org.specs2.mutable._ class GetInfoTest extends Specification with java.io.Serializable{ val data = List ( (d: {TypeID = 10, Timestamp: 1234, ID: ID1}), (d: {TypeID = 11, Timestamp: 5678, ID: ID1}), (d: {TypeID = 10, Timestamp: 1357, ID: ID2}), (d: {TypeID = 11, Timestamp: 2468, ID: ID2}) ) val expected_out = List( (ID1,5678), (ID2,2468), ) A GetInfo job should { //* How do I pass data define above as input and output which GetInfo expects as arguments? ** val sc = new SparkContext(local, GetInfo) //*** how do I get the output *** //assuming out_buffer has the output I want to match it to the expected output match expected output in { ( out_buffer == expected_out) must beTrue } } } I would like some help with the tasks marked with in the unit test code above. If specs2 is not the right way to go, I am also open to FunSuite. I would like to know how to pass the input while calling my program from the unit test and get the output. Thanks for your help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/guidance-on-simple-unit-testing-with-Spark-tp7604.html Sent from the Apache Spark User List mailing list archive at Nabble.com. -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001 E: daniel.siegm...@velos.io W: www.velos.io
Re: guidance on simple unit testing with Sprk
Ll mlll On Jun 14, 2014 4:05 AM, Matei Zaharia matei.zaha...@gmail.com wrote: You need to factor your program so that it’s not just a main(). This is not a Spark-specific issue, it’s about how you’d unit test any program in general. In this case, your main() creates a SparkContext, so you can’t pass one from outside, and your code has to read data from a file and write it to a file. It would be better to move your code for transforming data into a new function: def processData(lines: RDD[String]): RDD[String] = { // build and return your “res” variable } Then you can unit-test this directly on data you create in your program: val myLines = sc.parallelize(Seq(“line 1”, “line 2”)) val result = GetInfo.processData(myLines).collect() assert(result.toSet === Set(“res 1”, “res 2”)) Matei On Jun 13, 2014, at 2:42 PM, SK skrishna...@gmail.com wrote: Hi, I have looked through some of the test examples and also the brief documentation on unit testing at http://spark.apache.org/docs/latest/programming-guide.html#unit-testing, but still dont have a good understanding of writing unit tests using the Spark framework. Previously, I have written unit tests using specs2 framework and have got them to work in Scalding. I tried to use the specs2 framework with Spark, but could not find any simple examples I could follow. I am open to specs2 or Funsuite, whichever works best with Spark. I would like some additional guidance, or some simple sample code using specs2 or Funsuite. My code is provided below. I have the following code in src/main/scala/GetInfo.scala. It reads a Json file and extracts some data. It takes the input file (args(0)) and output file (args(1)) as arguments. object GetInfo{ def main(args: Array[String]) { val inp_file = args(0) val conf = new SparkConf().setAppName(GetInfo) val sc = new SparkContext(conf) val res = sc.textFile(log_file) .map(line = { parse(line) }) .map(json = { implicit lazy val formats = org.json4s.DefaultFormats val aid = (json \ d \ TypeID).extract[Int] val ts = (json \ d \ TimeStamp).extract[Long] val gid = (json \ d \ ID).extract[String] (aid, ts, gid) } ) .groupBy(tup = tup._3) .sortByKey(true) .map(g = (g._1, g._2.map(_._2).max)) res.map(tuple= %s, %d.format(tuple._1, tuple._2)).saveAsTextFile(args(1)) } I would like to test the above code. My unit test is in src/test/scala. The code I have so far for the unit test appears below: import org.apache.spark._ import org.specs2.mutable._ class GetInfoTest extends Specification with java.io.Serializable{ val data = List ( (d: {TypeID = 10, Timestamp: 1234, ID: ID1}), (d: {TypeID = 11, Timestamp: 5678, ID: ID1}), (d: {TypeID = 10, Timestamp: 1357, ID: ID2}), (d: {TypeID = 11, Timestamp: 2468, ID: ID2}) ) val expected_out = List( (ID1,5678), (ID2,2468), ) A GetInfo job should { //* How do I pass data define above as input and output which GetInfo expects as arguments? ** val sc = new SparkContext(local, GetInfo) //*** how do I get the output *** //assuming out_buffer has the output I want to match it to the expected output match expected output in { ( out_buffer == expected_out) must beTrue } } } I would like some help with the tasks marked with in the unit test code above. If specs2 is not the right way to go, I am also open to FunSuite. I would like to know how to pass the input while calling my program from the unit test and get the output. Thanks for your help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/guidance-on-simple-unit-testing-with-Spark-tp7604.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: guidance on simple unit testing with Spark
You need to factor your program so that it’s not just a main(). This is not a Spark-specific issue, it’s about how you’d unit test any program in general. In this case, your main() creates a SparkContext, so you can’t pass one from outside, and your code has to read data from a file and write it to a file. It would be better to move your code for transforming data into a new function: def processData(lines: RDD[String]): RDD[String] = { // build and return your “res” variable } Then you can unit-test this directly on data you create in your program: val myLines = sc.parallelize(Seq(“line 1”, “line 2”)) val result = GetInfo.processData(myLines).collect() assert(result.toSet === Set(“res 1”, “res 2”)) Matei On Jun 13, 2014, at 2:42 PM, SK skrishna...@gmail.com wrote: Hi, I have looked through some of the test examples and also the brief documentation on unit testing at http://spark.apache.org/docs/latest/programming-guide.html#unit-testing, but still dont have a good understanding of writing unit tests using the Spark framework. Previously, I have written unit tests using specs2 framework and have got them to work in Scalding. I tried to use the specs2 framework with Spark, but could not find any simple examples I could follow. I am open to specs2 or Funsuite, whichever works best with Spark. I would like some additional guidance, or some simple sample code using specs2 or Funsuite. My code is provided below. I have the following code in src/main/scala/GetInfo.scala. It reads a Json file and extracts some data. It takes the input file (args(0)) and output file (args(1)) as arguments. object GetInfo{ def main(args: Array[String]) { val inp_file = args(0) val conf = new SparkConf().setAppName(GetInfo) val sc = new SparkContext(conf) val res = sc.textFile(log_file) .map(line = { parse(line) }) .map(json = { implicit lazy val formats = org.json4s.DefaultFormats val aid = (json \ d \ TypeID).extract[Int] val ts = (json \ d \ TimeStamp).extract[Long] val gid = (json \ d \ ID).extract[String] (aid, ts, gid) } ) .groupBy(tup = tup._3) .sortByKey(true) .map(g = (g._1, g._2.map(_._2).max)) res.map(tuple= %s, %d.format(tuple._1, tuple._2)).saveAsTextFile(args(1)) } I would like to test the above code. My unit test is in src/test/scala. The code I have so far for the unit test appears below: import org.apache.spark._ import org.specs2.mutable._ class GetInfoTest extends Specification with java.io.Serializable{ val data = List ( (d: {TypeID = 10, Timestamp: 1234, ID: ID1}), (d: {TypeID = 11, Timestamp: 5678, ID: ID1}), (d: {TypeID = 10, Timestamp: 1357, ID: ID2}), (d: {TypeID = 11, Timestamp: 2468, ID: ID2}) ) val expected_out = List( (ID1,5678), (ID2,2468), ) A GetInfo job should { //* How do I pass data define above as input and output which GetInfo expects as arguments? ** val sc = new SparkContext(local, GetInfo) //*** how do I get the output *** //assuming out_buffer has the output I want to match it to the expected output match expected output in { ( out_buffer == expected_out) must beTrue } } } I would like some help with the tasks marked with in the unit test code above. If specs2 is not the right way to go, I am also open to FunSuite. I would like to know how to pass the input while calling my program from the unit test and get the output. Thanks for your help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/guidance-on-simple-unit-testing-with-Spark-tp7604.html Sent from the Apache Spark User List mailing list archive at Nabble.com.