Dynamic executor scaling spark/Kubernetes
Hello, Is Kubernetes Dynamic executor scaling for spark is available in latest release of spark I mean scaling the executors based on the work load vs preallocating number of executors for a spark job Thanks, Purna
Re: [ANNOUNCE] Announcing Apache Spark 2.4.0
Thanks this is a great news Can you please lemme if dynamic resource allocation is available in spark 2.4? I’m using spark 2.3.2 on Kubernetes, do I still need to provide executor memory options as part of spark submit command or spark will manage required executor memory based on the spark job size ? On Thu, Nov 8, 2018 at 2:18 PM Marcelo Vanzin wrote: > +user@ > > >> -- Forwarded message - > >> From: Wenchen Fan > >> Date: Thu, Nov 8, 2018 at 10:55 PM > >> Subject: [ANNOUNCE] Announcing Apache Spark 2.4.0 > >> To: Spark dev list > >> > >> > >> Hi all, > >> > >> Apache Spark 2.4.0 is the fifth release in the 2.x line. This release > adds Barrier Execution Mode for better integration with deep learning > frameworks, introduces 30+ built-in and higher-order functions to deal with > complex data type easier, improves the K8s integration, along with > experimental Scala 2.12 support. Other major updates include the built-in > Avro data source, Image data source, flexible streaming sinks, elimination > of the 2GB block size limitation during transfer, Pandas UDF improvements. > In addition, this release continues to focus on usability, stability, and > polish while resolving around 1100 tickets. > >> > >> We'd like to thank our contributors and users for their contributions > and early feedback to this release. This release would not have been > possible without you. > >> > >> To download Spark 2.4.0, head over to the download page: > http://spark.apache.org/downloads.html > >> > >> To view the release notes: > https://spark.apache.org/releases/spark-release-2-4-0.html > >> > >> Thanks, > >> Wenchen > >> > >> PS: If you see any issues with the release notes, webpage or published > artifacts, please contact me directly off-list. > > > > -- > Marcelo > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Spark 2.3.1: k8s driver pods stuck in Initializing state
Hello , We're running spark 2.3.1 on kubernetes v1.11.0 and our driver pods from k8s are getting stuck in initializing state like so: NAME READY STATUS RESTARTS AGE my-pod-fd79926b819d3b34b05250e23347d0e7-driver 0/1 Init:0/1 0 18h And from *kubectl describe pod*: *Warning FailedMount 9m (x128 over 4h) * kubelet, 10.47.96.167 Unable to mount volumes for pod "my-pod-fd79926b819d3b34b05250e23347d0e7-driver_spark(1f3aba7b-c10f-11e8-bcec-1292fec79aba)": timeout expired waiting for volumes to attach or mount for pod "spark"/"my-pod-fd79926b819d3b34b05250e23347d0e7-driver". list of unmounted volumes=[spark-init-properties]. list of unattached volumes=[spark-init-properties download-jars-volume download-files-volume spark-token-tfpvp] *Warning FailedMount 4m (x153 over 4h) kubelet,* 10.47.96.167 MountVolume.SetUp failed for volume "spark-init-properties" : configmaps "my-pod-fd79926b819d3b34b05250e23347d0e7-init-config" not found >From what I can see in *kubectl get configmap* the init config map for the driver pod isn't there. Am I correct in assuming since the configmap isn't being created the driver pod will never start (hence stuck in init)? Where does the init config map come from? Why would it not be created? Please suggest Thanks, Purna
Spark 2.3.1: k8s driver pods stuck in Initializing state
We're running spark 2.3.1 on kubernetes v1.11.0 and our driver pods from k8s are getting stuck in initializing state like so: NAME READY STATUS RESTARTS AGE my-pod-fd79926b819d3b34b05250e23347d0e7-driver 0/1 Init:0/1 0 18h And from *kubectl describe pod*: *Warning FailedMount 9m (x128 over 4h) * kubelet, 10.47.96.167 Unable to mount volumes for pod "my-pod-fd79926b819d3b34b05250e23347d0e7-driver_spark(1f3aba7b-c10f-11e8-bcec-1292fec79aba)": timeout expired waiting for volumes to attach or mount for pod "spark"/"my-pod-fd79926b819d3b34b05250e23347d0e7-driver". list of unmounted volumes=[spark-init-properties]. list of unattached volumes=[spark-init-properties download-jars-volume download-files-volume spark-token-tfpvp] *Warning FailedMount 4m (x153 over 4h) kubelet,* 10.47.96.167 MountVolume.SetUp failed for volume "spark-init-properties" : configmaps "my-pod-fd79926b819d3b34b05250e23347d0e7-init-config" not found From what I can see in *kubectl get configmap* the init config map for the driver pod isn't there. Am I correct in assuming since the configmap isn't being created the driver pod will never start (hence stuck in init)? Where does the init config map come from? Why would it not be created? Thanks, Christopher Carney The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Re: spark driver pod stuck in Waiting: PodInitializing state in Kubernetes
Resurfacing The question to get more attention Hello, > > im running Spark 2.3 job on kubernetes cluster >> >> kubectl version >> >> Client Version: version.Info{Major:"1", Minor:"9", >> GitVersion:"v1.9.3", GitCommit:"d2835416544f298c919e2ead3be3d0864b52323b", >> GitTreeState:"clean", BuildDate:"2018-02-09T21:51:06Z", >> GoVersion:"go1.9.4", Compiler:"gc", Platform:"darwin/amd64"} >> >> Server Version: version.Info{Major:"1", Minor:"8", >> GitVersion:"v1.8.3", GitCommit:"f0efb3cb883751c5ffdbe6d515f3cb4fbe7b7acd", >> GitTreeState:"clean", BuildDate:"2017-11-08T18:27:48Z", >> GoVersion:"go1.8.3", Compiler:"gc", Platform:"linux/amd64"} >> >> >> >> when i ran spark submit on k8s master the driverpod is stuck in Waiting: >> PodInitializing state. >> I had to manually kill the driver pod and submit new job in this case >> ,then it works.How this can be handled in production ? >> > This happens with executor pods as well > https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-25128 > > >> >> This is happening if i submit the jobs almost parallel ie submit 5 jobs >> one after the other simultaneously. >> >> I'm running spark jobs on 20 nodes each having below configuration >> >> I tried kubectl describe node on the node where trhe driver pod is >> running this is what i got ,i do see there is overcommit on resources but i >> expected kubernetes scheduler not to schedule if resources in node are >> overcommitted or node is in Not Ready state ,in this case node is in Ready >> State but i observe same behaviour if node is in "Not Ready" state >> >> >> >> Name: ** >> >> Roles: worker >> >> Labels: beta.kubernetes.io/arch=amd64 >> >> beta.kubernetes.io/os=linux >> >> kubernetes.io/hostname= >> >> node-role.kubernetes.io/worker=true >> >> Annotations:node.alpha.kubernetes.io/ttl=0 >> >> >> volumes.kubernetes.io/controller-managed-attach-detach=true >> >> Taints: >> >> CreationTimestamp: Tue, 31 Jul 2018 09:59:24 -0400 >> >> Conditions: >> >> Type Status LastHeartbeatTime >> LastTransitionTimeReason Message >> >> -- - >> ---- --- >> >> OutOfDiskFalse Tue, 14 Aug 2018 09:31:20 -0400 Tue, 31 >> Jul 2018 09:59:24 -0400 KubeletHasSufficientDisk kubelet has >> sufficient disk space available >> >> MemoryPressure False Tue, 14 Aug 2018 09:31:20 -0400 Tue, 31 >> Jul 2018 09:59:24 -0400 KubeletHasSufficientMemory kubelet has >> sufficient memory available >> >> DiskPressure False Tue, 14 Aug 2018 09:31:20 -0400 Tue, 31 >> Jul 2018 09:59:24 -0400 KubeletHasNoDiskPressure kubelet has no disk >> pressure >> >> ReadyTrueTue, 14 Aug 2018 09:31:20 -0400 Sat, 11 >> Aug 2018 00:41:27 -0400 KubeletReady kubelet is posting >> ready status. AppArmor enabled >> >> Addresses: >> >> InternalIP: * >> >> Hostname:** >> >> Capacity: >> >> cpu: 16 >> >> memory: 125827288Ki >> >> pods:110 >> >> Allocatable: >> >> cpu: 16 >> >> memory: 125724888Ki >> >> pods:110 >> >> System Info: >> >> Machine ID: * >> >> System UUID:** >> >> Boot ID:1493028d-0a80-4f2f-b0f1-48d9b8910e9f >> >> Kernel Version: 4.4.0-1062-aws >> >> OS Image: Ubuntu 16.04.4 LTS >> >> Operating System: linux >> >> Architecture: amd64 >> >> Container Runtime Version: docker://Unknown >> >> Kubelet Version:v1.8.3 >> >> Kube-Proxy Version: v1.8.3 >> >> PodCIDR: ** >> >> ExternalID: ** >> >> Non-terminated Pods: (11 in total) >> >> Namespace Name >>CPU Requests CPU Limits Memory Requests Memory >> Limits >> >> - >> -- --- >> - >> >> kube-systemcalico-node-gj5mb >> 250m (1%) 0 (0%) 0 (0%) 0 (0%) >> >> kube-system >> kube-proxy- 100m (0%) >> 0 (0%) 0 (0%) 0 (0%) >> >> kube-system >> prometheus-prometheus-node-exporter-9cntq 100m (0%) >> 200m (1%) 30Mi (0%)50Mi (0%) >> >> logging >> elasticsearch-elasticsearch-data-69df997486-gqcwg 400m (2%) >> 1 (6%) 8Gi (6%) 16Gi (13%) >> >> logging
Re: spark driver pod stuck in Waiting: PodInitializing state in Kubernetes
Hello, im running Spark 2.3 job on kubernetes cluster > > kubectl version > > Client Version: version.Info{Major:"1", Minor:"9", > GitVersion:"v1.9.3", GitCommit:"d2835416544f298c919e2ead3be3d0864b52323b", > GitTreeState:"clean", BuildDate:"2018-02-09T21:51:06Z", > GoVersion:"go1.9.4", Compiler:"gc", Platform:"darwin/amd64"} > > Server Version: version.Info{Major:"1", Minor:"8", > GitVersion:"v1.8.3", GitCommit:"f0efb3cb883751c5ffdbe6d515f3cb4fbe7b7acd", > GitTreeState:"clean", BuildDate:"2017-11-08T18:27:48Z", > GoVersion:"go1.8.3", Compiler:"gc", Platform:"linux/amd64"} > > > > when i ran spark submit on k8s master the driver pod is stuck in Waiting: > PodInitializing state. > I had to manually kill the driver pod and submit new job in this case > ,then it works.How this can be handled in production ? > https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-25128 > > This is happening if i submit the jobs almost parallel ie submit 5 jobs > one after the other simultaneously. > > I'm running spark jobs on 20 nodes each having below configuration > > I tried kubectl describe node on the node where trhe driver pod is running > this is what i got ,i do see there is overcommit on resources but i > expected kubernetes scheduler not to schedule if resources in node are > overcommitted or node is in Not Ready state ,in this case node is in Ready > State but i observe same behaviour if node is in "Not Ready" state > > > > Name: ** > > Roles: worker > > Labels: beta.kubernetes.io/arch=amd64 > > beta.kubernetes.io/os=linux > > kubernetes.io/hostname= > > node-role.kubernetes.io/worker=true > > Annotations:node.alpha.kubernetes.io/ttl=0 > > > volumes.kubernetes.io/controller-managed-attach-detach=true > > Taints: > > CreationTimestamp: Tue, 31 Jul 2018 09:59:24 -0400 > > Conditions: > > Type Status LastHeartbeatTime > LastTransitionTimeReason Message > > -- - > ---- --- > > OutOfDiskFalse Tue, 14 Aug 2018 09:31:20 -0400 Tue, 31 > Jul 2018 09:59:24 -0400 KubeletHasSufficientDisk kubelet has > sufficient disk space available > > MemoryPressure False Tue, 14 Aug 2018 09:31:20 -0400 Tue, 31 > Jul 2018 09:59:24 -0400 KubeletHasSufficientMemory kubelet has > sufficient memory available > > DiskPressure False Tue, 14 Aug 2018 09:31:20 -0400 Tue, 31 > Jul 2018 09:59:24 -0400 KubeletHasNoDiskPressure kubelet has no disk > pressure > > ReadyTrueTue, 14 Aug 2018 09:31:20 -0400 Sat, 11 > Aug 2018 00:41:27 -0400 KubeletReady kubelet is posting > ready status. AppArmor enabled > > Addresses: > > InternalIP: * > > Hostname:** > > Capacity: > > cpu: 16 > > memory: 125827288Ki > > pods:110 > > Allocatable: > > cpu: 16 > > memory: 125724888Ki > > pods:110 > > System Info: > > Machine ID: * > > System UUID:** > > Boot ID:1493028d-0a80-4f2f-b0f1-48d9b8910e9f > > Kernel Version: 4.4.0-1062-aws > > OS Image: Ubuntu 16.04.4 LTS > > Operating System: linux > > Architecture: amd64 > > Container Runtime Version: docker://Unknown > > Kubelet Version:v1.8.3 > > Kube-Proxy Version: v1.8.3 > > PodCIDR: ** > > ExternalID: ** > > Non-terminated Pods: (11 in total) > > Namespace Name >CPU Requests CPU Limits Memory Requests Memory > Limits > > - > -- --- > - > > kube-systemcalico-node-gj5mb > 250m (1%) 0 (0%) 0 (0%) 0 (0%) > > kube-system > kube-proxy- 100m (0%) > 0 (0%) 0 (0%) 0 (0%) > > kube-systemprometheus-prometheus-node-exporter-9cntq > 100m (0%) 200m (1%) 30Mi (0%)50Mi (0%) > > logging > elasticsearch-elasticsearch-data-69df997486-gqcwg 400m (2%) > 1 (6%) 8Gi (6%) 16Gi (13%) > > loggingfluentd-fluentd-elasticsearch-tj7nd > 200m (1%) 0 (0%) 612Mi (0%) 0 (0%) > > rook rook-agent-6jtzm >0 (0%)0 (0%) 0 (0%) 0 (0%)
spark driver pod stuck in Waiting: PodInitializing state in Kubernetes
im running Spark 2.3 job on kubernetes cluster kubectl version Client Version: version.Info{Major:"1", Minor:"9", GitVersion:"v1.9.3", GitCommit:"d2835416544f298c919e2ead3be3d0864b52323b", GitTreeState:"clean", BuildDate:"2018-02-09T21:51:06Z", GoVersion:"go1.9.4", Compiler:"gc", Platform:"darwin/amd64"} Server Version: version.Info{Major:"1", Minor:"8", GitVersion:"v1.8.3", GitCommit:"f0efb3cb883751c5ffdbe6d515f3cb4fbe7b7acd", GitTreeState:"clean", BuildDate:"2017-11-08T18:27:48Z", GoVersion:"go1.8.3", Compiler:"gc", Platform:"linux/amd64"} when i ran spark submit on k8s master the driver pod is stuck in Waiting: PodInitializing state. I had to manually kill the driver pod and submit new job in this case ,then it works. This is happening if i submit the jobs almost parallel ie submit 5 jobs one after the other simultaneously. I'm running spark jobs on 20 nodes each having below configuration I tried kubectl describe node on the node where trhe driver pod is running this is what i got ,i do see there is overcommit on resources but i expected kubernetes scheduler not to schedule if resources in node are overcommitted or node is in Not Ready state ,in this case node is in Ready State but i observe same behaviour if node is in "Not Ready" state Name: ** Roles: worker Labels: beta.kubernetes.io/arch=amd64 beta.kubernetes.io/os=linux kubernetes.io/hostname= node-role.kubernetes.io/worker=true Annotations:node.alpha.kubernetes.io/ttl=0 volumes.kubernetes.io/controller-managed-attach-detach=true Taints: CreationTimestamp: Tue, 31 Jul 2018 09:59:24 -0400 Conditions: Type Status LastHeartbeatTime LastTransitionTimeReason Message -- - ---- --- OutOfDiskFalse Tue, 14 Aug 2018 09:31:20 -0400 Tue, 31 Jul 2018 09:59:24 -0400 KubeletHasSufficientDisk kubelet has sufficient disk space available MemoryPressure False Tue, 14 Aug 2018 09:31:20 -0400 Tue, 31 Jul 2018 09:59:24 -0400 KubeletHasSufficientMemory kubelet has sufficient memory available DiskPressure False Tue, 14 Aug 2018 09:31:20 -0400 Tue, 31 Jul 2018 09:59:24 -0400 KubeletHasNoDiskPressure kubelet has no disk pressure ReadyTrueTue, 14 Aug 2018 09:31:20 -0400 Sat, 11 Aug 2018 00:41:27 -0400 KubeletReady kubelet is posting ready status. AppArmor enabled Addresses: InternalIP: * Hostname:** Capacity: cpu: 16 memory: 125827288Ki pods:110 Allocatable: cpu: 16 memory: 125724888Ki pods:110 System Info: Machine ID: * System UUID:** Boot ID:1493028d-0a80-4f2f-b0f1-48d9b8910e9f Kernel Version: 4.4.0-1062-aws OS Image: Ubuntu 16.04.4 LTS Operating System: linux Architecture: amd64 Container Runtime Version: docker://Unknown Kubelet Version:v1.8.3 Kube-Proxy Version: v1.8.3 PodCIDR: ** ExternalID: ** Non-terminated Pods: (11 in total) Namespace Name CPU Requests CPU Limits Memory Requests Memory Limits - -- --- - kube-systemcalico-node-gj5mb 250m (1%) 0 (0%) 0 (0%) 0 (0%) kube-system kube-proxy- 100m (0%) 0 (0%) 0 (0%) 0 (0%) kube-systemprometheus-prometheus-node-exporter-9cntq 100m (0%) 200m (1%) 30Mi (0%)50Mi (0%) logging elasticsearch-elasticsearch-data-69df997486-gqcwg 400m (2%) 1 (6%) 8Gi (6%) 16Gi (13%) loggingfluentd-fluentd-elasticsearch-tj7nd 200m (1%) 0 (0%) 612Mi (0%) 0 (0%) rook rook-agent-6jtzm 0 (0%)0 (0%) 0 (0%) 0 (0%) rook rook-ceph-osd-10-6-42-250.accel.aws-cardda.cb4good.com-gwb8j0 (0%) 0 (0%) 0 (0%) 0 (0%) spark accelerate-test-5-a3bfb8a597e83d459193a183e17f13b5-exec-1 2 (12%) 0 (0%) 10Gi (8%)12Gi (10%) spark accelerate-testing-1-8ed0482f3bfb3c0a83da30bb7d433dff-exec-52 (12%) 0 (0%) 10Gi (8%)12Gi
Re: Executor lost for unknown reasons error Spark 2.3 on kubernetes
$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) at java.lang.Thread.run(Thread.java:748) 2018-07-30 19:58:42 INFO BlockManagerMasterEndpoint:54 - Trying to remove executor 7 from BlockManagerMaster. 2018-07-30 19:58:42 WARN BlockManagerMasterEndpoint:66 - No more replicas available for rdd_9_37 ! MasterEndpoint:54 - Removing block manager BlockManagerId(7, 10.*.*.*.*, 43888, None) 2018-07-30 19:58:42 INFO BlockManagerMaster:54 - Removed 7 successfully in removeExecutor 2018-07-30 19:58:42 INFO DAGScheduler:54 - Shuffle files lost for executor: 7 (epoch 1) 2018-07-30 19:58:42 ERROR ContextCleaner:91 - Error cleaning broadcast 11 org.apache.spark.SparkException: Exception thrown in awaitResult: at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205) at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) at org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:155) at org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:321) at org.apache.spark.broadcast.TorrentBroadcastFactory.unbroadcast(TorrentBroadcastFactory.scala:45) at org.apache.spark.broadcast.BroadcastManager.unbroadcast(BroadcastManager.scala:66) at org.apache.spark.ContextCleaner.doCleanupBroadcast(ContextCleaner.scala:238) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$1.apply(ContextCleaner.scala:194) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$1.apply(ContextCleaner.scala:185) at scala.Option.foreach(Option.scala:257) at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:185) at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1319) at org.apache.spark.ContextCleaner.org <http://org.apache.spark.contextcleaner.org/> $apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:178) at org.apache.spark.ContextCleaner$$anon$1.run(ContextCleaner.scala:73) Caused by: java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:192) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288) at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1106) at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:343) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:123) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) at java.lang.Thread.run(Thread.java: On Tue, Jul 31, 2018 at 8:32 AM purna pradeep wrote: > > Hello, >> >> >> >> I’m getting below error in spark driver pod logs and executor pods are >> getting killed midway through while the job is running and even driver pod >> Terminated with below intermittent error ,this happens if I run multiple >> jobs in parallel. >> >> >> >> Not able to see executor logs as executor pods are killed >> >> >> >> org.apache.spark.SparkException: Job aborted due to stage failure: Task >> 23 in stage 36.0 failed 4 times, most recent failure: Lost task 23.3 in >> stage 36.0 (TID 1006, 10.10.125.119, executor 1): ExecutorLostFailure >> (executor 1 exited caused by one of the running tasks) Reason: Executor >> lost for unknown reasons. >> >> Driver stacktrace: >> >> at org.apache.spark.scheduler.DAGScheduler.org >> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599) >> >> at >> org.apache.spark.scheduler
Executor lost for unknown reasons error Spark 2.3 on kubernetes
> Hello, > > > > I’m getting below error in spark driver pod logs and executor pods are > getting killed midway through while the job is running and even driver pod > Terminated with below intermittent error ,this happens if I run multiple > jobs in parallel. > > > > Not able to see executor logs as executor pods are killed > > > > org.apache.spark.SparkException: Job aborted due to stage failure: Task 23 > in stage 36.0 failed 4 times, most recent failure: Lost task 23.3 in stage > 36.0 (TID 1006, 10.10.125.119, executor 1): ExecutorLostFailure (executor 1 > exited caused by one of the running tasks) Reason: Executor lost for > unknown reasons. > > Driver stacktrace: > > at org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599) > > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587) > > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586) > > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586) > > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) > > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) > > at scala.Option.foreach(Option.scala:257) > > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831) > > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820) > > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769) > > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758) > > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > > at > org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642) > > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027) > > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:194) > > ... 42 mor >
Executor lost for unknown reasons error Spark 2.3 on kubernetes
Hello, I’m getting below error in spark driver pod logs and executor pods are getting killed midway through while the job is running and even driver pod Terminated with below intermittent error ,this happens if I run multiple jobs in parallel. Not able to see executor logs as executor pods are killed org.apache.spark.SparkException: Job aborted due to stage failure: Task 23 in stage 36.0 failed 4 times, most recent failure: Lost task 23.3 in stage 36.0 (TID 1006, 10.10.125.119, executor 1): ExecutorLostFailure (executor 1 exited caused by one of the running tasks) Reason: Executor lost for unknown reasons. Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:194) ... 42 mor
Executor lost for unknown reasons error Spark 2.3 on kubernetes
Hello, I’m getting below error in spark driver pod logs and executor pods are getting killed midway through while the job is running and even driver pod Terminated with below intermittent error ,this happens if I run multiple jobs in parallel. Not able to see executor logs as executor pods are killed org.apache.spark.SparkException: Job aborted due to stage failure: Task 23 in stage 36.0 failed 4 times, most recent failure: Lost task 23.3 in stage 36.0 (TID 1006, 10.10.125.119, executor 1): ExecutorLostFailure (executor 1 exited caused by one of the running tasks) Reason: Executor lost for unknown reasons. Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:194) ... 42 more Thanks, Purna The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Spark 2.3 Kubernetes error
> Hello, > > > > When I’m trying to set below options to spark-submit command on k8s Master > getting below error in spark-driver pod logs > > > > --conf spark.executor.extraJavaOptions=" -Dhttps.proxyHost=myhost > -Dhttps.proxyPort=8099 -Dhttp.useproxy=true -Dhttps.protocols=TLSv1.2" \ > > --conf spark.driver.extraJavaOptions="--Dhttps.proxyHost=myhost > -Dhttps.proxyPort=8099 -Dhttp.useproxy=true -Dhttps.protocols=TLSv1.2" \ > > > > But when I tried to set these extraJavaoptions as system.properties in the > spark application jar everything works fine. > > > > 2018-06-11 21:26:28 ERROR SparkContext:91 - Error initializing > SparkContext. > > org.apache.spark.SparkException: External scheduler cannot be instantiated > > at > org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2747) > > at > org.apache.spark.SparkContext.init(SparkContext.scala:492) > > at > org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2486) > > at > org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:930) > > at > org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:921) > > at scala.Option.getOrElse(Option.scala:121) > > at > org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:921) > > Caused by: io.fabric8.kubernetes.client.KubernetesClientException: > Operation: [get] for kind: [Pod] with name: > [test-657e2f715ada3f91ae32c588aa178f63-driver] in namespace: [test] > failed. > > at > io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:62) > > at > io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:71) > > at > io.fabric8.kubernetes.client.dsl.base.BaseOperation.getMandatory(BaseOperation.java:228) > > at > io.fabric8.kubernetes.client.dsl.base.BaseOperation.get(BaseOperation.java:184) > > at > org.apache.spark.scheduler.cluster.k8s.KubernetesClusterSchedulerBackend.init(KubernetesClusterSchedulerBackend.scala:70) > > at > org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager.createSchedulerBackend(KubernetesClusterManager.scala:120) > > at > org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2741) > > ... 12 more > > Caused by: javax.net.ssl.SSLHandshakeException: > sun.security.validator.ValidatorException: PKIX path building failed: > sun.security.provider.certpath.SunCertPathBuilderException: unable to find > valid certification path to requested target > > at > sun.security.ssl.Alerts.getSSLException(Alerts.java:192) > > at > sun.security.ssl.SSLSocketImpl.fatal(SSLSocketImpl.java:1959) > > at > sun.security.ssl.Handshaker.fatalSE(Handshaker.java:302) > > at > sun.security.ssl.Handshaker.fatalSE(Handshaker.java:296) > > at > sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1514) > > at > sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:216) > > at > sun.security.ssl.Handshaker.processLoop(Handshaker.java:1026) > > at > sun.security.ssl.Handshaker.process_record(Handshaker.java:961) > > at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:1072) > > at > sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1385) > > at > sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1413) > > at > sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1397) > > at > okhttp3.internal.connection.RealConnection.connectTls(RealConnection.java:281) > > at > okhttp3.internal.connection.RealConnection.establishProtocol(RealConnection.java:251) > > at > okhttp3.internal.connection.RealConnection.connect(RealConnection.java:151) > > at > okhttp3.internal.connection.StreamAllocation.findConnection(StreamAllocation.java:195) > > at > okhttp3.internal.connection.StreamAllocation.findHealthyConnection(StreamAllocation.java:121) > > at > okhttp3.internal.connection.StreamAllocation.newStream(StreamAllocation.java:100) > > at > okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:42) > > at > okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92) > > at > okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67) > > at > okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93) > > at >
Spark 2.3 Kubernetes error
Hello, When I’m trying to set below options to spark-submit command on k8s Master getting below error in spark-driver pod logs --conf spark.executor.extraJavaOptions=" -Dhttps.proxyHost=myhost -Dhttps.proxyPort=8099 -Dhttp.useproxy=true -Dhttps.protocols=TLSv1.2" \ --conf spark.driver.extraJavaOptions="--Dhttps.proxyHost=myhost -Dhttps.proxyPort=8099 -Dhttp.useproxy=true -Dhttps.protocols=TLSv1.2" \ But when I tried to set these extraJavaoptions as system.properties in the spark application jar everything works fine. 2018-06-11 21:26:28 ERROR SparkContext:91 - Error initializing SparkContext. org.apache.spark.SparkException: External scheduler cannot be instantiated at org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2747) at org.apache.spark.SparkContext.init(SparkContext.scala:492) at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2486) at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:930) at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:921) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:921) Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Operation: [get] for kind: [Pod] with name: [test-657e2f715ada3f91ae32c588aa178f63-driver] in namespace: [test] failed. at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:62) at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:71) at io.fabric8.kubernetes.client.dsl.base.BaseOperation.getMandatory(BaseOperation.java:228) at io.fabric8.kubernetes.client.dsl.base.BaseOperation.get(BaseOperation.java:184) at org.apache.spark.scheduler.cluster.k8s.KubernetesClusterSchedulerBackend.init(KubernetesClusterSchedulerBackend.scala:70) at org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager.createSchedulerBackend(KubernetesClusterManager.scala:120) at org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2741) ... 12 more Caused by: javax.net.ssl.SSLHandshakeException: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target at sun.security.ssl.Alerts.getSSLException(Alerts.java:192) at sun.security.ssl.SSLSocketImpl.fatal(SSLSocketImpl.java:1959) at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:302) at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:296) at sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1514) at sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:216) at sun.security.ssl.Handshaker.processLoop(Handshaker.java:1026) at sun.security.ssl.Handshaker.process_record(Handshaker.java:961) at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:1072) at sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1385) at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1413) at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1397) at okhttp3.internal.connection.RealConnection.connectTls(RealConnection.java:281) at okhttp3.internal.connection.RealConnection.establishProtocol(RealConnection.java:251) at okhttp3.internal.connection.RealConnection.connect(RealConnection.java:151) at okhttp3.internal.connection.StreamAllocation.findConnection(StreamAllocation.java:195) at okhttp3.internal.connection.StreamAllocation.findHealthyConnection(StreamAllocation.java:121) at okhttp3.internal.connection.StreamAllocation.newStream(StreamAllocation.java:100) at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:42) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67) at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67) at
Spark 2.3 Kubernetes error
Hello, When I’m trying to set below options to spark-submit command on k8s Master getting below error in spark-driver pod logs --conf spark.executor.extraJavaOptions=" -Dhttps.proxyHost=myhost -Dhttps.proxyPort=8099 -Dhttp.useproxy=true -Dhttps.protocols=TLSv1.2" \ --conf spark.driver.extraJavaOptions="--Dhttps.proxyHost=myhost -Dhttps.proxyPort=8099 -Dhttp.useproxy=true -Dhttps.protocols=TLSv1.2" \ But when I tried to set these extraJavaoptions as system.properties in the spark application jar everything works fine. 2018-06-11 21:26:28 ERROR SparkContext:91 - Error initializing SparkContext. org.apache.spark.SparkException: External scheduler cannot be instantiated at org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2747) at org.apache.spark.SparkContext.init(SparkContext.scala:492) at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2486) at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:930) at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:921) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:921) Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Operation: [get] for kind: [Pod] with name: [test-657e2f715ada3f91ae32c588aa178f63-driver] in namespace: [test] failed. at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:62) at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:71) at io.fabric8.kubernetes.client.dsl.base.BaseOperation.getMandatory(BaseOperation.java:228) at io.fabric8.kubernetes.client.dsl.base.BaseOperation.get(BaseOperation.java:184) at org.apache.spark.scheduler.cluster.k8s.KubernetesClusterSchedulerBackend.init(KubernetesClusterSchedulerBackend.scala:70) at org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager.createSchedulerBackend(KubernetesClusterManager.scala:120) at org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2741) ... 12 more Caused by: javax.net.ssl.SSLHandshakeException: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target at sun.security.ssl.Alerts.getSSLException(Alerts.java:192) at sun.security.ssl.SSLSocketImpl.fatal(SSLSocketImpl.java:1959) at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:302) at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:296) at sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1514) at sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:216) at sun.security.ssl.Handshaker.processLoop(Handshaker.java:1026) at sun.security.ssl.Handshaker.process_record(Handshaker.java:961) at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:1072) at sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1385) at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1413) at sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1397) at okhttp3.internal.connection.RealConnection.connectTls(RealConnection.java:281) at okhttp3.internal.connection.RealConnection.establishProtocol(RealConnection.java:251) at okhttp3.internal.connection.RealConnection.connect(RealConnection.java:151) at okhttp3.internal.connection.StreamAllocation.findConnection(StreamAllocation.java:195) at okhttp3.internal.connection.StreamAllocation.findHealthyConnection(StreamAllocation.java:121) at okhttp3.internal.connection.StreamAllocation.newStream(StreamAllocation.java:100) at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:42) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67) at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67) at
Spark 2.3 driver pod stuck in Running state — Kubernetes
Hello, When I run spark-submit on k8s cluster I’m Seeing driver pod stuck in Running state and when I pulled driver pod logs I’m able to see below log I do understand that this warning might be because of lack of cpu/ Memory , but I expect driver pod be in “Pending” state rather than “ Running” state though actually it’s not Running So I had kill the driver pod and resubmit the job Please suggest here ! 2018-06-08 14:38:01 WARN TaskSchedulerImpl:66 - Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources 2018-06-08 14:38:16 WARN TaskSchedulerImpl:66 - Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources 2018-06-08 14:38:31 WARN TaskSchedulerImpl:66 - Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources 2018-06-08 14:38:46 WARN TaskSchedulerImpl:66 - Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources 2018-06-08 14:39:01 WARN TaskSchedulerImpl:66 - Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
spark partitionBy with partitioned column in json output
im reading below json in spark {"bucket": "B01", "actionType": "A1", "preaction": "NULL", "postaction": "NULL"} {"bucket": "B02", "actionType": "A2", "preaction": "NULL", "postaction": "NULL"} {"bucket": "B03", "actionType": "A3", "preaction": "NULL", "postaction": "NULL"} val df=spark.read.json("actions.json").toDF() Now im writing the same to a json output as below df.write. format("json"). mode("append"). partitionBy("bucket","actionType"). save("output.json") and the output.json is as below {"preaction":"NULL","postaction":"NULL"} bucket,actionType columns are missing in the json output, i need partitionby columns as well in the output
Re: Spark 2.3 error on Kubernetes
Abirudh, Thanks for your response I’m running k8s cluster on AWS and kub-dns pods are running fine and also as I mentioned only 1 executor pod is running though I requested for 5 and rest 4 were killed with below error and I do have enough resources available. On Tue, May 29, 2018 at 6:28 PM Anirudh Ramanathan wrote: > This looks to me like a kube-dns error that's causing the driver DNS > address to not resolve. > It would be worth double checking that kube-dns is indeed running (in the > kube-system namespace). > Often, with environments like minikube, kube-dns may exit/crashloop due to > lack of resource. > > On Tue, May 29, 2018 at 3:18 PM, purna pradeep > wrote: > >> Hello, >> >> I’m getting below error when I spark-submit a Spark 2.3 app on >> Kubernetes *v1.8.3* , some of the executor pods were killed with below >> error as soon as they come up >> >> Exception in thread "main" java.lang.reflect.UndeclaredThrowableException >> >> at >> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1713) >> >> at >> org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:64) >> >> at >> org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:188) >> >> at >> org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:293) >> >> at >> org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala) >> >> Caused by: org.apache.spark.SparkException: Exception thrown in >> awaitResult: >> >> at >> org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205) >> >> at >> org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) >> >> at >> org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:101) >> >> at >> org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:201) >> >> at >> org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:65) >> >> at >> org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:64) >> >> at java.security.AccessController.doPrivileged(Native >> Method) >> >> at javax.security.auth.Subject.doAs(Subject.java:422) >> >> at >> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698) >> >> ... 4 more >> >> Caused by: java.io.IOException: Failed to connect to >> spark-1527629824987-driver-svc.spark.svc:7078 >> >> at >> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:245) >> >> at >> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:187) >> >> at >> org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:198) >> >> at >> org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:194) >> >> at >> org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:190) >> >> at >> java.util.concurrent.FutureTask.run(FutureTask.java:266) >> >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >> >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> >> at java.lang.Thread.run(Thread.java:748) >> >> Caused by: java.net.UnknownHostException: >> spark-1527629824987-driver-svc.spark.svc >> >> at >> java.net.InetAddress.getAllByName0(InetAddress.java:1280) >> >> at >> java.net.InetAddress.getAllByName(InetAddress.java:1192) >> >> at >> java.net.InetAddress.getAllByName(InetAddress.java:1126) >> >> at java.net.InetAddress.getByName(InetAddress.java:1076) >> >> at >> io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:146) >> >> at >> io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:143) >> >> at java.security.AccessController.doPri
Spark 2.3 error on Kubernetes
Hello, I’m getting below error when I spark-submit a Spark 2.3 app on Kubernetes *v1.8.3* , some of the executor pods were killed with below error as soon as they come up Exception in thread "main" java.lang.reflect.UndeclaredThrowableException at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1713) at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:64) at org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:188) at org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:293) at org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala) Caused by: org.apache.spark.SparkException: Exception thrown in awaitResult: at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205) at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:101) at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:201) at org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:65) at org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:64) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698) ... 4 more Caused by: java.io.IOException: Failed to connect to spark-1527629824987-driver-svc.spark.svc:7078 at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:245) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:187) at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:198) at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:194) at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:190) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.net.UnknownHostException: spark-1527629824987-driver-svc.spark.svc at java.net.InetAddress.getAllByName0(InetAddress.java:1280) at java.net.InetAddress.getAllByName(InetAddress.java:1192) at java.net.InetAddress.getAllByName(InetAddress.java:1126) at java.net.InetAddress.getByName(InetAddress.java:1076) at io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:146) at io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:143) at java.security.AccessController.doPrivileged(Native Method) at io.netty.util.internal.SocketUtils.addressByName(SocketUtils.java:143) at io.netty.resolver.DefaultNameResolver.doResolve(DefaultNameResolver.java:43) at io.netty.resolver.SimpleNameResolver.resolve(SimpleNameResolver.java:63) at io.netty.resolver.SimpleNameResolver.resolve(SimpleNameResolver.java:55) at io.netty.resolver.InetSocketAddressResolver.doResolve(InetSocketAddressResolver.java:57) at io.netty.resolver.InetSocketAddressResolver.doResolve(InetSocketAddressResolver.java:32) at io.netty.resolver.AbstractAddressResolver.resolve(AbstractAddressResolver.java:108) at io.netty.bootstrap.Bootstrap.doResolveAndConnect0(Bootstrap.java:208) at io.netty.bootstrap.Bootstrap.access$000(Bootstrap.java:49) at io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:188) at io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:174) at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:507) at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:481) at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:420) at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104) at io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:82) at
Spark 2.3 error on kubernetes
Hello, I’m getting below intermittent error when I spark-submit a Spark 2.3 app on Kubernetes v1.8.3 , some of the executor pods were killed with below error as soon as they come up Exception in thread "main" java.lang.reflect.UndeclaredThrowableException at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1713) at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:64) at org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:188) at org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:293) at org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala) Caused by: org.apache.spark.SparkException: Exception thrown in awaitResult: at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205) at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:101) at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:201) at org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:65) at org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:64) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698) ... 4 more Caused by: java.io.IOException: Failed to connect to spark-1527629824987-driver-svc.spark.svc:7078 at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:245) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:187) at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:198) at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:194) at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:190) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.net.UnknownHostException: spark-1527629824987-driver-svc.spark.svc at java.net.InetAddress.getAllByName0(InetAddress.java:1280) at java.net.InetAddress.getAllByName(InetAddress.java:1192) at java.net.InetAddress.getAllByName(InetAddress.java:1126) at java.net.InetAddress.getByName(InetAddress.java:1076) at io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:146) at io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:143) at java.security.AccessController.doPrivileged(Native Method) at io.netty.util.internal.SocketUtils.addressByName(SocketUtils.java:143) at io.netty.resolver.DefaultNameResolver.doResolve(DefaultNameResolver.java:43) at io.netty.resolver.SimpleNameResolver.resolve(SimpleNameResolver.java:63) at io.netty.resolver.SimpleNameResolver.resolve(SimpleNameResolver.java:55) at io.netty.resolver.InetSocketAddressResolver.doResolve(InetSocketAddressResolver.java:57) at io.netty.resolver.InetSocketAddressResolver.doResolve(InetSocketAddressResolver.java:32) at io.netty.resolver.AbstractAddressResolver.resolve(AbstractAddressResolver.java:108) at io.netty.bootstrap.Bootstrap.doResolveAndConnect0(Bootstrap.java:208) at io.netty.bootstrap.Bootstrap.access$000(Bootstrap.java:49) at io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:188) at io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:174) at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:507) at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:481) at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:420) at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104) at io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:82) at
Spark 2.3 error on kubernetes
Hello, I’m getting below intermittent error when I spark-submit a Spark 2.3 app on Kubernetes v1.8.3 , some of the executor pods were killed with below error as soon as they come up Exception in thread "main" java.lang.reflect.UndeclaredThrowableException at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1713) at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:64) at org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:188) at org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:293) at org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala) Caused by: org.apache.spark.SparkException: Exception thrown in awaitResult: at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205) at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:101) at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:201) at org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:65) at org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:64) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698) ... 4 more Caused by: java.io.IOException: Failed to connect to spark-1527629824987-driver-svc.spark.svc:7078 at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:245) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:187) at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:198) at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:194) at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:190) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.net.UnknownHostException: spark-1527629824987-driver-svc.spark.svc at java.net.InetAddress.getAllByName0(InetAddress.java:1280) at java.net.InetAddress.getAllByName(InetAddress.java:1192) at java.net.InetAddress.getAllByName(InetAddress.java:1126) at java.net.InetAddress.getByName(InetAddress.java:1076) at io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:146) at io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:143) at java.security.AccessController.doPrivileged(Native Method) at io.netty.util.internal.SocketUtils.addressByName(SocketUtils.java:143) at io.netty.resolver.DefaultNameResolver.doResolve(DefaultNameResolver.java:43) at io.netty.resolver.SimpleNameResolver.resolve(SimpleNameResolver.java:63) at io.netty.resolver.SimpleNameResolver.resolve(SimpleNameResolver.java:55) at io.netty.resolver.InetSocketAddressResolver.doResolve(InetSocketAddressResolver.java:57) at io.netty.resolver.InetSocketAddressResolver.doResolve(InetSocketAddressResolver.java:32) at io.netty.resolver.AbstractAddressResolver.resolve(AbstractAddressResolver.java:108) at io.netty.bootstrap.Bootstrap.doResolveAndConnect0(Bootstrap.java:208) at io.netty.bootstrap.Bootstrap.access$000(Bootstrap.java:49) at io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:188) at io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:174) at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:507) at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:481) at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:420) at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104) at io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:82) at
Spark driver pod garbage collection
Hello, Currently I observe dead pods are not getting garbage collected (aka spark driver pods which have completed execution). So pods could sit in the namespace for weeks potentially. This makes listing, parsing, and reading pods slower and well as having junk sit on the cluster. I believe minimum-container-ttl-duration kubelet flag is by default set to 0 minute but I don’t see the completed spark driver pods are garbage collected Do I need to set any flag explicitly @ kubelet level?
Spark driver pod eviction Kubernetes
Hi, What would be the recommended approach to wait for spark driver pod to complete the currently running job before it gets evicted to new nodes while maintenance on the current node is goingon (kernel upgrade,hardware maintenance etc..) using drain command I don’t think I can use PoDisruptionBudget as Spark pods deployment yaml(s) is taken by Kubernetes Please suggest !
Oozie with spark 2.3 in Kubernetes
Hello, Would like to know if anyone tried oozie with spark 2.3 actions on Kubernetes for scheduling spark jobs . Thanks, Purna
Re: Scala program to spark-submit on k8 cluster
yes “REST application that submits a Spark job to a k8s cluster by running spark-submit programmatically” and also would like to expose as a Kubernetes service so that clients can access as any other Rest api On Wed, Apr 4, 2018 at 12:25 PM Yinan Liwrote: > Hi Kittu, > > What do you mean by "a Scala program"? Do you mean a program that submits > a Spark job to a k8s cluster by running spark-submit programmatically, or > some example Scala application that is to run on the cluster? > > On Wed, Apr 4, 2018 at 4:45 AM, Kittu M wrote: > >> Hi, >> >> I’m looking for a Scala program to spark submit a Scala application >> (spark 2.3 job) on k8 cluster . >> >> Any help would be much appreciated. Thanks >> >> >> >
unsubscribe
unsubscribe
unsubscribe
- To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Unsubscribe
- To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Rest API for Spark2.3 submit on kubernetes(version 1.8.*) cluster
Thanks Yinan, Looks like this is stil in alpha version. Would like to know if there is any rest-interface for spark2.3 job submission similar to spark 2.2 as I need to submit spark applications to k8 master based on different events (cron or s3 file based trigger) On Tue, Mar 20, 2018 at 11:50 PM Yinan Li <liyinan...@gmail.com> wrote: > One option is the Spark Operator > <https://github.com/GoogleCloudPlatform/spark-on-k8s-operator>. It allows > specifying and running Spark applications on Kubernetes using Kubernetes > custom resources objects. It takes SparkApplication CRD objects and > automatically submits the applications to run on a Kubernetes cluster. > > Yinan > > On Tue, Mar 20, 2018 at 7:47 PM, purna pradeep <purna2prad...@gmail.com> > wrote: > >> Im using kubernetes cluster on AWS to run spark jobs ,im using spark 2.3 >> ,now i want to run spark-submit from AWS lambda function to k8s >> master,would like to know if there is any REST interface to run Spark >> submit on k8s Master > > >
Rest API for Spark2.3 submit on kubernetes(version 1.8.*) cluster
Im using kubernetes cluster on AWS to run spark jobs ,im using spark 2.3 ,now i want to run spark-submit from AWS lambda function to k8s master,would like to know if there is any REST interface to run Spark submit on k8s Master
Re: Spark 2.3 submit on Kubernetes error
Thanks Yinan, I’m able to get kube-dns endpoints when I ran this command kubectl get ep kube-dns —namespace=kube-system Do I need to deploy under kube-system instead of default namespace And please lemme know if you have any insights on Error1 ? On Sun, Mar 11, 2018 at 8:26 PM Yinan Li <liyinan...@gmail.com> wrote: > Spark on Kubernetes requires the presence of the kube-dns add-on properly > configured. The executors connect to the driver through a headless > Kubernetes service using the DNS name of the service. Can you check if you > have the add-on installed in your cluster? This issue > https://github.com/apache-spark-on-k8s/spark/issues/558 might help. > > > On Sun, Mar 11, 2018 at 5:01 PM, purna pradeep <purna2prad...@gmail.com> > wrote: > >> Getting below errors when I’m trying to run spark-submit on k8 cluster >> >> >> *Error 1*:This looks like a warning it doesn’t interrupt the app running >> inside executor pod but keeps on getting this warning >> >> >> *2018-03-09 11:15:21 WARN WatchConnectionManager:192 - Exec Failure* >> *java.io.EOFException* >> * at >> okio.RealBufferedSource.require(RealBufferedSource.java:60)* >> * at >> okio.RealBufferedSource.readByte(RealBufferedSource.java:73)* >> * at okhttp3.internal.ws >> <http://okhttp3.internal.ws>.WebSocketReader.readHeader(WebSocketReader.java:113)* >> * at okhttp3.internal.ws >> <http://okhttp3.internal.ws>.WebSocketReader.processNextFrame(WebSocketReader.java:97)* >> * at okhttp3.internal.ws >> <http://okhttp3.internal.ws>.RealWebSocket.loopReader(RealWebSocket.java:262)* >> * at okhttp3.internal.ws >> <http://okhttp3.internal.ws>.RealWebSocket$2.onResponse(RealWebSocket.java:201)* >> * at okhttp3.RealCall$AsyncCall.execute(RealCall.java:141)* >> * at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)* >> * at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)* >> * at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)* >> * at java.lang.Thread.run(Thread.java:748)* >> >> >> >> *Error2:* This is intermittent error which is failing the executor pod >> to run >> >> >> *org.apache.spark.SparkException: External scheduler cannot be >> instantiated* >> * at >> org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2747)* >> * at org.apache.spark.SparkContext.(SparkContext.scala:492)* >> * at >> org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2486)* >> * at >> org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:930)* >> * at >> org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:921)* >> * at scala.Option.getOrElse(Option.scala:121)* >> * at >> org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:921)* >> * at >> com.capitalone.quantum.spark.core.QuantumSession$.initialize(QuantumSession.scala:62)* >> * at >> com.capitalone.quantum.spark.core.QuantumSession$.getSparkSession(QuantumSession.scala:80)* >> * at >> com.capitalone.quantum.workflow.WorkflowApp$.getSession(WorkflowApp.scala:116)* >> * at >> com.capitalone.quantum.workflow.WorkflowApp$.main(WorkflowApp.scala:90)* >> * at >> com.capitalone.quantum.workflow.WorkflowApp.main(WorkflowApp.scala)* >> *Caused by: io.fabric8.kubernetes.client.KubernetesClientException: >> Operation: [get] for kind: [Pod] with name: >> [myapp-ef79db3d9f4831bf85bda14145fdf113-driver-driver] in namespace: >> [default] failed.* >> * at >> io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:62)* >> * at >> io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:71)* >> * at >> io.fabric8.kubernetes.client.dsl.base.BaseOperation.getMandatory(BaseOperation.java:228)* >> * at >> io.fabric8.kubernetes.client.dsl.base.BaseOperation.get(BaseOperation.java:184)* >> * at >> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterSchedulerBackend.(KubernetesClusterSchedulerBackend.scala:70)* >> * at >> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager.createSchedulerBackend(KubernetesClusterManager.scala:120)* >> * at >> org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$cr
Spark 2.3 submit on Kubernetes error
Getting below errors when I’m trying to run spark-submit on k8 cluster *Error 1*:This looks like a warning it doesn’t interrupt the app running inside executor pod but keeps on getting this warning *2018-03-09 11:15:21 WARN WatchConnectionManager:192 - Exec Failure* *java.io.EOFException* * at okio.RealBufferedSource.require(RealBufferedSource.java:60)* * at okio.RealBufferedSource.readByte(RealBufferedSource.java:73)* * at okhttp3.internal.ws.WebSocketReader.readHeader(WebSocketReader.java:113)* * at okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:97)* * at okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:262)* * at okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:201)* * at okhttp3.RealCall$AsyncCall.execute(RealCall.java:141)* * at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)* * at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)* * at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)* * at java.lang.Thread.run(Thread.java:748)* *Error2:* This is intermittent error which is failing the executor pod to run *org.apache.spark.SparkException: External scheduler cannot be instantiated* * at org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2747)* * at org.apache.spark.SparkContext.(SparkContext.scala:492)* * at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2486)* * at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:930)* * at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:921)* * at scala.Option.getOrElse(Option.scala:121)* * at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:921)* * at com.capitalone.quantum.spark.core.QuantumSession$.initialize(QuantumSession.scala:62)* * at com.capitalone.quantum.spark.core.QuantumSession$.getSparkSession(QuantumSession.scala:80)* * at com.capitalone.quantum.workflow.WorkflowApp$.getSession(WorkflowApp.scala:116)* * at com.capitalone.quantum.workflow.WorkflowApp$.main(WorkflowApp.scala:90)* * at com.capitalone.quantum.workflow.WorkflowApp.main(WorkflowApp.scala)* *Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Operation: [get] for kind: [Pod] with name: [myapp-ef79db3d9f4831bf85bda14145fdf113-driver-driver] in namespace: [default] failed.* * at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:62)* * at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:71)* * at io.fabric8.kubernetes.client.dsl.base.BaseOperation.getMandatory(BaseOperation.java:228)* * at io.fabric8.kubernetes.client.dsl.base.BaseOperation.get(BaseOperation.java:184)* * at org.apache.spark.scheduler.cluster.k8s.KubernetesClusterSchedulerBackend.(KubernetesClusterSchedulerBackend.scala:70)* * at org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager.createSchedulerBackend(KubernetesClusterManager.scala:120)* * at org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2741)* * ... 11 more* *Caused by: java.net.UnknownHostException: kubernetes.default.svc: Try again* * at java.net.Inet4AddressImpl.lookupAllHostAddr(Native Method)* * at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:928)* * at java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1323)* * at java.net.InetAddress.getAllByName0(InetAddress.java:1276)* * at java.net.InetAddress.getAllByName(InetAddress.java:1192)* * at java.net.InetAddress.getAllByName(InetAddress.java:1126)* * at okhttp3.Dns$1.lookup(Dns.java:39)* * at okhttp3.internal.connection.RouteSelector.resetNextInetSocketAddress(RouteSelector.java:171)* * at okhttp3.internal.connection.RouteSelector.nextProxy(RouteSelector.java:137)* * at okhttp3.internal.connection.RouteSelector.next(RouteSelector.java:82)* * at okhttp3.internal.connection.StreamAllocation.findConnection(StreamAllocation.java:171)* * at okhttp3.internal.connection.StreamAllocation.findHealthyConnection(StreamAllocation.java:121)* * at okhttp3.internal.connection.StreamAllocation.newStream(StreamAllocation.java:100)* * at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:42)* * at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)* * at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)* * at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)* * at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)* *
handling Remote dependencies for spark-submit in spark 2.3 with kubernetes
Im trying to run spark-submit to kubernetes cluster with spark 2.3 docker container image The challenge im facing is application have a mainapplication.jar and other dependency files & jars which are located in Remote location like AWS s3 ,but as per spark 2.3 documentation there is something called kubernetes init-container to download remote dependencies but in this case im not creating any Podspec to include init-containers in kubernetes, as per documentation Spark 2.3 spark/kubernetes internally creates Pods (driver,executor) So not sure how can i use init-container for spark-submit when there are remote dependencies. https://spark.apache.org/docs/latest/running-on-kubernetes.html#using-remote-dependencies Please suggest
Unsubscribe
- To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Unsubscribe
- To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Unsubscribe
- To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Unsubscribe
Unsubscribe
Executor not getting added SparkUI & Spark Eventlog in deploymode:cluster
Hi all, Im performing spark submit using Spark rest api POST operation on 6066 port with below config > Launch Command: > "/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.141-1.b16.el7_3.x86_64/jre/bin/java" > "-cp" "/usr/local/spark/conf/:/usr/local/spark/jars/*" "-Xmx4096M" > "-Dspark.eventLog.enabled=true" > "-Dspark.app.name=WorkflowApp" > "-Dspark.submit.deployMode=cluster" > "-Dspark.local.dir=/data0,/data1,/data2,/data3" > "-Dspark.executor.cores=2" "-Dspark.master=spark://:7077" > "-Dspark.serializer=org.apache.spark.serializer.KryoSerializer" > "-Dspark.jars=s3a://<***>.jar" "-Dspark.driver.supervise=false" > "-Dspark.history.fs.logDirectory=s3a://<*>/" > "-Dspark.hadoop.fs.s3a.server-side-encryption-algorithm=AES256" > "-Dspark.driver.memory=4G" "-Dspark.executor.memory=4G" > "-Dspark.eventLog.dir=s3a://<*>/" > "org.apache.spark.deploy.worker.DriverWrapper" "spark://Worker@<***>" > "/usr/local/spark/work/driver-<***>.jar" "MyApp" "-c" "s3a://<***>" when i looked into Spark eventlog below is what i observed {"Event":"SparkListenerExecutorAdded","Timestamp":1510633498623,"Executor ID":"driver","Executor Info":{"Host":"localhost","Total Cores":2,"Log Urls":{}}} "spark.master":"local[*]" Though i ran in deployMode as cluster the slave ip is not shown in Host section & spark.master is shown as local[*] above ,because of this the job is running only on driver and therefore when job is submitted its not showing up in http://:8080 under Running and Completed applications and it shows only under Running Drivers & Completed Drivers. Please suggest The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Spark http: Not showing completed apps
Hi, I'm using spark standalone in aws ec2 .And I'm using spark rest API http::8080/Json to get completed apps but the Json completed apps as empty array though the job ran successfully.
Bulk load to HBase
We are on Hortonworks 2.5 and very soon upgrading to 2.6. Spark version 1.6.2. We have large volume of data that we bulk load to HBase using import tsv. Map Reduce job is very slow and looking for options we can use spark to improve performance. Please let me know if this can be optimized with spark and what packages or libs can be used. PM - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Select entire row based on a logic applied on 2 columns across multiple rows
@Andres I need latest but it should less than 10 months based income_age column and don't want to use sql here On Wed, Aug 30, 2017 at 8:08 AM Andrés Ivaldi <iaiva...@gmail.com> wrote: > Hi, if you need the last value from income in window function you can use > last_value. > No tested but meaby with @ayan sql > > spark.sql("select *, row_number(), last_value(income) over (partition by > id order by income_age_ts desc) r from t") > > > On Tue, Aug 29, 2017 at 11:30 PM, purna pradeep <purna2prad...@gmail.com> > wrote: > >> @ayan, >> >> Thanks for your response >> >> I would like to have functions in this case calculateIncome and the >> reason why I need function is to reuse in other parts of the application >> ..that's the reason I'm planning for mapgroups with function as argument >> which takes rowiterator ..but not sure if this is the best to implement as >> my initial dataframe is very large >> >> On Tue, Aug 29, 2017 at 10:24 PM ayan guha <guha.a...@gmail.com> wrote: >> >>> Hi >>> >>> the tool you are looking for is window function. Example: >>> >>> >>> df.show() >>> +++---+--+-+ >>> |JoinDate|dept| id|income|income_age_ts| >>> +++---+--+-+ >>> | 4/20/13| ES|101| 19000| 4/20/17| >>> | 4/20/13| OS|101| 1| 10/3/15| >>> | 4/20/12| DS|102| 13000| 5/9/17| >>> | 4/20/12| CS|102| 12000| 5/8/17| >>> | 4/20/10| EQ|103| 1| 5/9/17| >>> | 4/20/10| MD|103| 9000| 5/8/17| >>> +++---+--+-+ >>> >>> >>> res = spark.sql("select *, row_number() over (partition by id order >>> by income_age_ts desc) r from t") >>> >>> res.show() >>> +++---+--+-+---+ >>> |JoinDate|dept| id|income|income_age_ts| r| >>> +++---+--+-+---+ >>> | 4/20/10| EQ|103| 1| 5/9/17| 1| >>> | 4/20/10| MD|103| 9000| 5/8/17| 2| >>> | 4/20/13| ES|101| 19000| 4/20/17| 1| >>> | 4/20/13| OS|101| 1| 10/3/15| 2| >>> | 4/20/12| DS|102| 13000| 5/9/17| 1| >>> | 4/20/12| CS|102| 12000| 5/8/17| 2| >>> +++---+--+-+---+ >>> >>> >>> res = spark.sql("select * from (select *, row_number() over >>> (partition by id order by income_age_ts desc) r from t) x where r=1") >>> >>> res.show() >>> +++---+--+-+---+ >>> |JoinDate|dept| id|income|income_age_ts| r| >>> +++---+--+-+---+ >>> | 4/20/10| EQ|103| 1| 5/9/17| 1| >>> | 4/20/13| ES|101| 19000| 4/20/17| 1| >>> | 4/20/12| DS|102| 13000| 5/9/17| 1| >>> +++---+--+-+---+ >>> >>> This should be better because it uses all in-built optimizations in >>> Spark. >>> >>> Best >>> Ayan >>> >>> On Wed, Aug 30, 2017 at 11:06 AM, purna pradeep <purna2prad...@gmail.com >>> > wrote: >>> >>>> Please click on unnamed text/html link for better view >>>> >>>> On Tue, Aug 29, 2017 at 8:11 PM purna pradeep <purna2prad...@gmail.com> >>>> wrote: >>>> >>>>> >>>>> -- Forwarded message - >>>>> From: Mamillapalli, Purna Pradeep < >>>>> purnapradeep.mamillapa...@capitalone.com> >>>>> Date: Tue, Aug 29, 2017 at 8:08 PM >>>>> Subject: Spark question >>>>> To: purna pradeep <purna2prad...@gmail.com> >>>>> >>>>> Below is the input Dataframe(In real this is a very large Dataframe) >>>>> >>>>> >>>>> >>>>> EmployeeID >>>>> >>>>> INCOME >>>>> >>>>> INCOME AGE TS >>>>> >>>>> JoinDate >>>>> >>>>> Dept >>>>> >>>>> 101 >>>>> >>>>> 19000 >>>>> >>>>> 4/20/17 >>>>> >>>>> 4/20/13 >>>>> >>>>> ES >>>>> >>>>> 101 >>>>> >>>>> 1 >>>>> >>>>> 10/3/15 >>>>> >>>>> 4/20/13 >>>>
Re: Select entire row based on a logic applied on 2 columns across multiple rows
@ayan, Thanks for your response I would like to have functions in this case calculateIncome and the reason why I need function is to reuse in other parts of the application ..that's the reason I'm planning for mapgroups with function as argument which takes rowiterator ..but not sure if this is the best to implement as my initial dataframe is very large On Tue, Aug 29, 2017 at 10:24 PM ayan guha <guha.a...@gmail.com> wrote: > Hi > > the tool you are looking for is window function. Example: > > >>> df.show() > +++---+--+-+ > |JoinDate|dept| id|income|income_age_ts| > +++---+--+-+ > | 4/20/13| ES|101| 19000| 4/20/17| > | 4/20/13| OS|101| 1| 10/3/15| > | 4/20/12| DS|102| 13000| 5/9/17| > | 4/20/12| CS|102| 12000| 5/8/17| > | 4/20/10| EQ|103| 1| 5/9/17| > | 4/20/10| MD|103| 9000| 5/8/17| > +++---+--+-+ > > >>> res = spark.sql("select *, row_number() over (partition by id order by > income_age_ts desc) r from t") > >>> res.show() > +++---+--+-+---+ > |JoinDate|dept| id|income|income_age_ts| r| > +++---+--+-+---+ > | 4/20/10| EQ|103| 1| 5/9/17| 1| > | 4/20/10| MD|103| 9000| 5/8/17| 2| > | 4/20/13| ES|101| 19000| 4/20/17| 1| > | 4/20/13| OS|101| 1| 10/3/15| 2| > | 4/20/12| DS|102| 13000| 5/9/17| 1| > | 4/20/12| CS|102| 12000| 5/8/17| 2| > +++---+--+-+---+ > > >>> res = spark.sql("select * from (select *, row_number() over (partition > by id order by income_age_ts desc) r from t) x where r=1") > >>> res.show() > +++---+--+-+---+ > |JoinDate|dept| id|income|income_age_ts| r| > +++---+--+-+---+ > | 4/20/10| EQ|103| 1| 5/9/17| 1| > | 4/20/13| ES|101| 19000| 4/20/17| 1| > | 4/20/12| DS|102| 13000| 5/9/17| 1| > +++---+--+-+---+ > > This should be better because it uses all in-built optimizations in Spark. > > Best > Ayan > > On Wed, Aug 30, 2017 at 11:06 AM, purna pradeep <purna2prad...@gmail.com> > wrote: > >> Please click on unnamed text/html link for better view >> >> On Tue, Aug 29, 2017 at 8:11 PM purna pradeep <purna2prad...@gmail.com> >> wrote: >> >>> >>> -- Forwarded message - >>> From: Mamillapalli, Purna Pradeep < >>> purnapradeep.mamillapa...@capitalone.com> >>> Date: Tue, Aug 29, 2017 at 8:08 PM >>> Subject: Spark question >>> To: purna pradeep <purna2prad...@gmail.com> >>> >>> Below is the input Dataframe(In real this is a very large Dataframe) >>> >>> >>> >>> EmployeeID >>> >>> INCOME >>> >>> INCOME AGE TS >>> >>> JoinDate >>> >>> Dept >>> >>> 101 >>> >>> 19000 >>> >>> 4/20/17 >>> >>> 4/20/13 >>> >>> ES >>> >>> 101 >>> >>> 1 >>> >>> 10/3/15 >>> >>> 4/20/13 >>> >>> OS >>> >>> 102 >>> >>> 13000 >>> >>> 5/9/17 >>> >>> 4/20/12 >>> >>> DS >>> >>> 102 >>> >>> 12000 >>> >>> 5/8/17 >>> >>> 4/20/12 >>> >>> CS >>> >>> 103 >>> >>> 1 >>> >>> 5/9/17 >>> >>> 4/20/10 >>> >>> EQ >>> >>> 103 >>> >>> 9000 >>> >>> 5/8/15 >>> >>> 4/20/10 >>> >>> MD >>> >>> Get the latest income of an employee which has Income_age ts <10 months >>> >>> Expected output Dataframe >>> >>> EmployeeID >>> >>> INCOME >>> >>> INCOME AGE TS >>> >>> JoinDate >>> >>> Dept >>> >>> 101 >>> >>> 19000 >>> >>> 4/20/17 >>> >>> 4/20/13 >>> >>> ES >>> >>> 102 >>> >>> 13000 >>> >>> 5/9/17 >>> >>> 4/20/12 >>> >>> DS >>> >>> 103 >>> >>> 1 >>> >>> 5/9/17 >>> >>> 4/20/10 >>> >
Re: Select entire row based on a logic applied on 2 columns across multiple rows
Please click on unnamed text/html link for better view On Tue, Aug 29, 2017 at 8:11 PM purna pradeep <purna2prad...@gmail.com> wrote: > > -- Forwarded message - > From: Mamillapalli, Purna Pradeep < > purnapradeep.mamillapa...@capitalone.com> > Date: Tue, Aug 29, 2017 at 8:08 PM > Subject: Spark question > To: purna pradeep <purna2prad...@gmail.com> > > Below is the input Dataframe(In real this is a very large Dataframe) > > > > EmployeeID > > INCOME > > INCOME AGE TS > > JoinDate > > Dept > > 101 > > 19000 > > 4/20/17 > > 4/20/13 > > ES > > 101 > > 1 > > 10/3/15 > > 4/20/13 > > OS > > 102 > > 13000 > > 5/9/17 > > 4/20/12 > > DS > > 102 > > 12000 > > 5/8/17 > > 4/20/12 > > CS > > 103 > > 1 > > 5/9/17 > > 4/20/10 > > EQ > > 103 > > 9000 > > 5/8/15 > > 4/20/10 > > MD > > Get the latest income of an employee which has Income_age ts <10 months > > Expected output Dataframe > > EmployeeID > > INCOME > > INCOME AGE TS > > JoinDate > > Dept > > 101 > > 19000 > > 4/20/17 > > 4/20/13 > > ES > > 102 > > 13000 > > 5/9/17 > > 4/20/12 > > DS > > 103 > > 1 > > 5/9/17 > > 4/20/10 > > EQ > > > Below is what im planning to implement > > > > case class employee (*EmployeeID*: Int, *INCOME*: Int, INCOMEAGE: Int, > *JOINDATE*: Int,DEPT:String) > > > > *val *empSchema = *new *StructType().add(*"EmployeeID"*,*"Int"*).add( > *"INCOME"*, *"Int"*).add(*"INCOMEAGE"*,*"Date"*) . add(*"JOINDATE"*, > *"Date"*). add(*"DEPT"*,*"String"*) > > > > *//Reading from the File **import *sparkSession.implicits._ > > *val *readEmpFile = sparkSession.read > .option(*"sep"*, *","*) > .schema(empSchema) > .csv(INPUT_DIRECTORY) > > > *//Create employee DataFrame **val *custDf = readEmpFile.as[employee] > > > *//Adding Salary Column **val *groupByDf = custDf.groupByKey(a => a.* > EmployeeID*) > > > *val *k = groupByDf.mapGroups((key,value) => performETL(value)) > > > > > > *def *performETL(empData: Iterator[employee]) : new employee = { > > *val *empList = empData.toList > > > *//calculate income has Logic to figureout latest income for an account > and returns latest income val *income = calculateIncome(empList) > > > *for *(i <- empList) { > > *val *row = i > > *return new *employee(row.EmployeeID, row.INCOMEAGE , income) > } > *return "Done"* > > > > } > > > > Is this a better approach or even the right approach to implement the > same.If not please suggest a better way to implement the same? > > > > -- > > The information contained in this e-mail is confidential and/or > proprietary to Capital One and/or its affiliates and may only be used > solely in performance of work or services for Capital One. The information > transmitted herewith is intended only for use by the individual or entity > to which it is addressed. If the reader of this message is not the intended > recipient, you are hereby notified that any review, retransmission, > dissemination, distribution, copying or other use of, or taking of any > action in reliance upon this information is strictly prohibited. If you > have received this communication in error, please contact the sender and > delete the material from your computer. >
Re: use WithColumn with external function in a java jar
Thanks, I'll check it out. On Mon, Aug 28, 2017 at 10:22 PM Praneeth Gayam <praneeth.ga...@gmail.com> wrote: > You can create a UDF which will invoke your java lib > > def calculateExpense: UserDefinedFunction = udf((pexpense: String, cexpense: > String) => new MyJava().calculateExpense(pexpense.toDouble, > cexpense.toDouble)) > > > > > > On Tue, Aug 29, 2017 at 6:53 AM, purna pradeep <purna2prad...@gmail.com> > wrote: > >> I have data in a DataFrame with below columns >> >> 1)Fileformat is csv >> 2)All below column datatypes are String >> >> employeeid,pexpense,cexpense >> >> Now I need to create a new DataFrame which has new column called >> `expense`, which is calculated based on columns `pexpense`, `cexpense`. >> >> The tricky part is the calculation algorithm is not an **UDF** function >> which I created, but it's an external function that needs to be imported >> from a Java library which takes primitive types as arguments - in this case >> `pexpense`, `cexpense` - to calculate the value required for new column. >> >> The external function signature >> >> public class MyJava >> >> { >> >> public Double calculateExpense(Double pexpense, Double cexpense) { >>// calculation >> } >> >> } >> >> So how can I invoke that external function to create a new calculated >> column. Can I register that external function as UDF in my Spark >> application? >> >> Stackoverflow reference >> >> >> https://stackoverflow.com/questions/45928007/use-withcolumn-with-external-function >> >> >> >> >> >> >
Select entire row based on a logic applied on 2 columns across multiple rows
-- Forwarded message - From: Mamillapalli, Purna Pradeep <purnapradeep.mamillapa...@capitalone.com> Date: Tue, Aug 29, 2017 at 8:08 PM Subject: Spark question To: purna pradeep <purna2prad...@gmail.com> Below is the input Dataframe(In real this is a very large Dataframe) EmployeeID INCOME INCOME AGE TS JoinDate Dept 101 19000 4/20/17 4/20/13 ES 101 1 10/3/15 4/20/13 OS 102 13000 5/9/17 4/20/12 DS 102 12000 5/8/17 4/20/12 CS 103 1 5/9/17 4/20/10 EQ 103 9000 5/8/15 4/20/10 MD Get the latest income of an employee which has Income_age ts <10 months Expected output Dataframe EmployeeID INCOME INCOME AGE TS JoinDate Dept 101 19000 4/20/17 4/20/13 ES 102 13000 5/9/17 4/20/12 DS 103 1 5/9/17 4/20/10 EQ Below is what im planning to implement case class employee (*EmployeeID*: Int, *INCOME*: Int, INCOMEAGE: Int, *JOINDATE*: Int,DEPT:String) *val *empSchema = *new *StructType().add(*"EmployeeID"*,*"Int"*).add( *"INCOME"*, *"Int"*).add(*"INCOMEAGE"*,*"Date"*) . add(*"JOINDATE"*,*"Date"*). add(*"DEPT"*,*"String"*) *//Reading from the File **import *sparkSession.implicits._ *val *readEmpFile = sparkSession.read .option(*"sep"*, *","*) .schema(empSchema) .csv(INPUT_DIRECTORY) *//Create employee DataFrame **val *custDf = readEmpFile.as[employee] *//Adding Salary Column **val *groupByDf = custDf.groupByKey(a => a.* EmployeeID*) *val *k = groupByDf.mapGroups((key,value) => performETL(value)) *def *performETL(empData: Iterator[employee]) : new employee = { *val *empList = empData.toList *//calculate income has Logic to figureout latest income for an account and returns latest income val *income = calculateIncome(empList) *for *(i <- empList) { *val *row = i *return new *employee(row.EmployeeID, row.INCOMEAGE , income) } *return "Done"* } Is this a better approach or even the right approach to implement the same.If not please suggest a better way to implement the same? -- The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
use WithColumn with external function in a java jar
I have data in a DataFrame with below columns 1)Fileformat is csv 2)All below column datatypes are String employeeid,pexpense,cexpense Now I need to create a new DataFrame which has new column called `expense`, which is calculated based on columns `pexpense`, `cexpense`. The tricky part is the calculation algorithm is not an **UDF** function which I created, but it's an external function that needs to be imported from a Java library which takes primitive types as arguments - in this case `pexpense`, `cexpense` - to calculate the value required for new column. The external function signature public class MyJava { public Double calculateExpense(Double pexpense, Double cexpense) { // calculation } } So how can I invoke that external function to create a new calculated column. Can I register that external function as UDF in my Spark application? Stackoverflow reference https://stackoverflow.com/questions/45928007/use-withcolumn-with-external-function
Re: Restart streaming query spark 2.1 structured streaming
And also is query.stop() is graceful stop operation?what happens to already received data will it be processed ? On Tue, Aug 15, 2017 at 7:21 PM purna pradeep <purna2prad...@gmail.com> wrote: > Ok thanks > > Few more > > 1.when I looked into the documentation it says onQueryprogress is not > threadsafe ,So Is this method would be the right place to refresh cache?and > no need to restart query if I choose listener ? > > The methods are not thread-safe as they may be called from different > threads. > > > > https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala > > > > 2.if I use streamingquerylistner onqueryprogress my understanding is > method will be executed only when the query is in progress so if I refresh > data frame here without restarting query will it impact application ? > > 3.should I use unpersist (Boolean) blocking method or async method > unpersist() as the data size is big. > > I feel your solution is better as it stops query --> refresh cache --> > starts query if I compromise on little downtime even cached dataframe is > huge .I'm not sure how listener behaves as it's asynchronous, correct me if > I'm wrong. > > On Tue, Aug 15, 2017 at 6:36 PM Tathagata Das <tathagata.das1...@gmail.com> > wrote: > >> Both works. The asynchronous method with listener will have less of down >> time, just that the first trigger/batch after the asynchronous >> unpersist+persist will probably take longer as it has to reload the data. >> >> >> On Tue, Aug 15, 2017 at 2:29 PM, purna pradeep <purna2prad...@gmail.com> >> wrote: >> >>> Thanks tathagata das actually I'm planning to something like this >>> >>> activeQuery.stop() >>> >>> //unpersist and persist cached data frame >>> >>> df.unpersist() >>> >>> //read the updated data //data size of df is around 100gb >>> >>> df.persist() >>> >>> activeQuery = startQuery() >>> >>> >>> the cached data frame size around 100gb ,so the question is this the >>> right place to refresh this huge cached data frame ? >>> >>> I'm also trying to refresh cached data frame in onqueryprogress() method >>> in a class which extends StreamingQuerylistner >>> >>> Would like to know which is the best place to refresh cached data frame >>> and why >>> >>> Thanks again for the below response >>> >>> On Tue, Aug 15, 2017 at 4:45 PM Tathagata Das < >>> tathagata.das1...@gmail.com> wrote: >>> >>>> You can do something like this. >>>> >>>> >>>> def startQuery(): StreamingQuery = { >>>>// create your streaming dataframes >>>>// start the query with the same checkpoint directory} >>>> >>>> // handle to the active queryvar activeQuery: StreamingQuery = null >>>> while(!stopped) { >>>> >>>> if (activeQuery = null) { // if query not active, start query >>>> activeQuery = startQuery() >>>> >>>>} else if (shouldRestartQuery()) { // check your condition and >>>> restart query >>>> activeQuery.stop() >>>> activeQuery = startQuery() >>>>} >>>> >>>>activeQuery.awaitTermination(100) // wait for 100 ms. >>>>// if there is any error it will throw exception and quit the loop >>>>// otherwise it will keep checking the condition every 100ms} >>>> >>>> >>>> >>>> >>>> On Tue, Aug 15, 2017 at 1:13 PM, purna pradeep <purna2prad...@gmail.com >>>> > wrote: >>>> >>>>> Thanks Michael >>>>> >>>>> I guess my question is little confusing ..let me try again >>>>> >>>>> >>>>> I would like to restart streaming query programmatically while my >>>>> streaming application is running based on a condition and why I want to do >>>>> this >>>>> >>>>> I want to refresh a cached data frame based on a condition and the >>>>> best way to do this restart streaming query suggested by Tdas below for >>>>> similar problem >>>>> >>>>> >>>>> http://mail-archives.apache.org/mod_mbox/spark-user/201705.mbox/%3cCA+AHuKn+vSEWkJD=bsst6g5bdzdas6wmn+fwmn4jtm1x1nd...@mail.gmail.com%3e >>>>> >>&g
Re: Restart streaming query spark 2.1 structured streaming
Ok thanks Few more 1.when I looked into the documentation it says onQueryprogress is not threadsafe ,So Is this method would be the right place to refresh cache?and no need to restart query if I choose listener ? The methods are not thread-safe as they may be called from different threads. https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala 2.if I use streamingquerylistner onqueryprogress my understanding is method will be executed only when the query is in progress so if I refresh data frame here without restarting query will it impact application ? 3.should I use unpersist (Boolean) blocking method or async method unpersist() as the data size is big. I feel your solution is better as it stops query --> refresh cache --> starts query if I compromise on little downtime even cached dataframe is huge .I'm not sure how listener behaves as it's asynchronous, correct me if I'm wrong. On Tue, Aug 15, 2017 at 6:36 PM Tathagata Das <tathagata.das1...@gmail.com> wrote: > Both works. The asynchronous method with listener will have less of down > time, just that the first trigger/batch after the asynchronous > unpersist+persist will probably take longer as it has to reload the data. > > > On Tue, Aug 15, 2017 at 2:29 PM, purna pradeep <purna2prad...@gmail.com> > wrote: > >> Thanks tathagata das actually I'm planning to something like this >> >> activeQuery.stop() >> >> //unpersist and persist cached data frame >> >> df.unpersist() >> >> //read the updated data //data size of df is around 100gb >> >> df.persist() >> >> activeQuery = startQuery() >> >> >> the cached data frame size around 100gb ,so the question is this the >> right place to refresh this huge cached data frame ? >> >> I'm also trying to refresh cached data frame in onqueryprogress() method >> in a class which extends StreamingQuerylistner >> >> Would like to know which is the best place to refresh cached data frame >> and why >> >> Thanks again for the below response >> >> On Tue, Aug 15, 2017 at 4:45 PM Tathagata Das < >> tathagata.das1...@gmail.com> wrote: >> >>> You can do something like this. >>> >>> >>> def startQuery(): StreamingQuery = { >>>// create your streaming dataframes >>>// start the query with the same checkpoint directory} >>> >>> // handle to the active queryvar activeQuery: StreamingQuery = null >>> while(!stopped) { >>> >>>if (activeQuery = null) { // if query not active, start query >>> activeQuery = startQuery() >>> >>>} else if (shouldRestartQuery()) { // check your condition and >>> restart query >>> activeQuery.stop() >>> activeQuery = startQuery() >>>} >>> >>>activeQuery.awaitTermination(100) // wait for 100 ms. >>>// if there is any error it will throw exception and quit the loop >>>// otherwise it will keep checking the condition every 100ms} >>> >>> >>> >>> >>> On Tue, Aug 15, 2017 at 1:13 PM, purna pradeep <purna2prad...@gmail.com> >>> wrote: >>> >>>> Thanks Michael >>>> >>>> I guess my question is little confusing ..let me try again >>>> >>>> >>>> I would like to restart streaming query programmatically while my >>>> streaming application is running based on a condition and why I want to do >>>> this >>>> >>>> I want to refresh a cached data frame based on a condition and the best >>>> way to do this restart streaming query suggested by Tdas below for similar >>>> problem >>>> >>>> >>>> http://mail-archives.apache.org/mod_mbox/spark-user/201705.mbox/%3cCA+AHuKn+vSEWkJD=bsst6g5bdzdas6wmn+fwmn4jtm1x1nd...@mail.gmail.com%3e >>>> >>>> I do understand that checkpoint if helps in recovery and failures but I >>>> would like to know "how to restart streaming query programmatically without >>>> stopping my streaming application" >>>> >>>> In place of query.awaittermination should I need to have an logic to >>>> restart query? Please suggest >>>> >>>> >>>> On Tue, Aug 15, 2017 at 3:26 PM Michael Armbrust < >>>> mich...@databricks.com> wrote: >>>> >>>>> See >>>>> https://spark.apache.org/docs/latest/structur
Re: Restart streaming query spark 2.1 structured streaming
Thanks tathagata das actually I'm planning to something like this activeQuery.stop() //unpersist and persist cached data frame df.unpersist() //read the updated data //data size of df is around 100gb df.persist() activeQuery = startQuery() the cached data frame size around 100gb ,so the question is this the right place to refresh this huge cached data frame ? I'm also trying to refresh cached data frame in onqueryprogress() method in a class which extends StreamingQuerylistner Would like to know which is the best place to refresh cached data frame and why Thanks again for the below response On Tue, Aug 15, 2017 at 4:45 PM Tathagata Das <tathagata.das1...@gmail.com> wrote: > You can do something like this. > > > def startQuery(): StreamingQuery = { >// create your streaming dataframes >// start the query with the same checkpoint directory} > > // handle to the active queryvar activeQuery: StreamingQuery = null > while(!stopped) { > >if (activeQuery = null) { // if query not active, start query > activeQuery = startQuery() > >} else if (shouldRestartQuery()) { // check your condition and > restart query > activeQuery.stop() > activeQuery = startQuery() >} > >activeQuery.awaitTermination(100) // wait for 100 ms. >// if there is any error it will throw exception and quit the loop >// otherwise it will keep checking the condition every 100ms} > > > > > On Tue, Aug 15, 2017 at 1:13 PM, purna pradeep <purna2prad...@gmail.com> > wrote: > >> Thanks Michael >> >> I guess my question is little confusing ..let me try again >> >> >> I would like to restart streaming query programmatically while my >> streaming application is running based on a condition and why I want to do >> this >> >> I want to refresh a cached data frame based on a condition and the best >> way to do this restart streaming query suggested by Tdas below for similar >> problem >> >> >> http://mail-archives.apache.org/mod_mbox/spark-user/201705.mbox/%3cCA+AHuKn+vSEWkJD=bsst6g5bdzdas6wmn+fwmn4jtm1x1nd...@mail.gmail.com%3e >> >> I do understand that checkpoint if helps in recovery and failures but I >> would like to know "how to restart streaming query programmatically without >> stopping my streaming application" >> >> In place of query.awaittermination should I need to have an logic to >> restart query? Please suggest >> >> >> On Tue, Aug 15, 2017 at 3:26 PM Michael Armbrust <mich...@databricks.com> >> wrote: >> >>> See >>> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing >>> >>> Though I think that this currently doesn't work with the console sink. >>> >>> On Tue, Aug 15, 2017 at 9:40 AM, purna pradeep <purna2prad...@gmail.com> >>> wrote: >>> >>>> Hi, >>>> >>>>> >>>>> I'm trying to restart a streaming query to refresh cached data frame >>>>> >>>>> Where and how should I restart streaming query >>>>> >>>> >>>> >>>> val sparkSes = SparkSession >>>> >>>> .builder >>>> >>>> .config("spark.master", "local") >>>> >>>> .appName("StreamingCahcePoc") >>>> >>>> .getOrCreate() >>>> >>>> >>>> >>>> import sparkSes.implicits._ >>>> >>>> >>>> >>>> val dataDF = sparkSes.readStream >>>> >>>> .schema(streamSchema) >>>> >>>> .csv("testData") >>>> >>>> >>>> >>>> >>>> >>>>val query = counts.writeStream >>>> >>>> .outputMode("complete") >>>> >>>> .format("console") >>>> >>>> .start() >>>> >>>> >>>> query.awaittermination() >>>> >>>> >>>> >>>>> >>>>> >>>>> >>> >
Re: Restart streaming query spark 2.1 structured streaming
Thanks Michael I guess my question is little confusing ..let me try again I would like to restart streaming query programmatically while my streaming application is running based on a condition and why I want to do this I want to refresh a cached data frame based on a condition and the best way to do this restart streaming query suggested by Tdas below for similar problem http://mail-archives.apache.org/mod_mbox/spark-user/201705.mbox/%3cCA+AHuKn+vSEWkJD=bsst6g5bdzdas6wmn+fwmn4jtm1x1nd...@mail.gmail.com%3e I do understand that checkpoint if helps in recovery and failures but I would like to know "how to restart streaming query programmatically without stopping my streaming application" In place of query.awaittermination should I need to have an logic to restart query? Please suggest On Tue, Aug 15, 2017 at 3:26 PM Michael Armbrust <mich...@databricks.com> wrote: > See > https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing > > Though I think that this currently doesn't work with the console sink. > > On Tue, Aug 15, 2017 at 9:40 AM, purna pradeep <purna2prad...@gmail.com> > wrote: > >> Hi, >> >>> >>> I'm trying to restart a streaming query to refresh cached data frame >>> >>> Where and how should I restart streaming query >>> >> >> >> val sparkSes = SparkSession >> >> .builder >> >> .config("spark.master", "local") >> >> .appName("StreamingCahcePoc") >> >> .getOrCreate() >> >> >> >> import sparkSes.implicits._ >> >> >> >> val dataDF = sparkSes.readStream >> >> .schema(streamSchema) >> >> .csv("testData") >> >> >> >> >> >>val query = counts.writeStream >> >> .outputMode("complete") >> >> .format("console") >> >> .start() >> >> >> query.awaittermination() >> >> >> >>> >>> >>> >
Restart streaming query spark 2.1 structured streaming
Hi, > > I'm trying to restart a streaming query to refresh cached data frame > > Where and how should I restart streaming query > val sparkSes = SparkSession .builder .config("spark.master", "local") .appName("StreamingCahcePoc") .getOrCreate() import sparkSes.implicits._ val dataDF = sparkSes.readStream .schema(streamSchema) .csv("testData") val query = counts.writeStream .outputMode("complete") .format("console") .start() query.awaittermination() > > >
StreamingQueryListner spark structered Streaming
Im working on structered streaming application wherein im reading from Kafka as stream and for each batch of streams i need to perform S3 lookup file (which is nearly 200gb) to fetch some attributes .So im using df.persist() (basically caching the lookup) but i need to refresh the dataframe as the S3 lookup data changes frequently.im using below code class RefreshcachedDF(sparkSession: SparkSession) extends StreamingQueryListener { override def onQueryStarted(event: org.apache.spark.sql.streaming.StreamingQueryListener.QueryStartedEvent): Unit = {} override def onQueryTerminated(event: org.apache.spark.sql.streaming.StreamingQueryListener.QueryTerminatedEvent): Unit = {} override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = { val currTime = System.currentTimeMillis() if (currTime > (latestrefreshtime mentioned in a globaltempview)) { //oldDF is a cached Dataframe created from GlobalTempView which is of size 150GB. oldDF.unpersist() //I guess this is async call ,should i use unpersist(true) which is blocking?and is it safe ? val inputDf: DataFrame = readFile(spec, sparkSession) val recreateddf = inputDf.persist() val count = recreateddf.count() } } } } Is the above approach is a better solution to refresh cached dataframe? and the trigger for this refresh is will store the expirydate of cache for S3 in a globaltempview . Note:S3 is one lookup source but i do have other sources which has data size of 20 to 30 GB - So the question is this the right place to refresh the cached df ? - if yes should i use blocking or non-blocking unpersist method as the data is huge 15GB? - For similar issue i see below response from Tdas with subject as Re: Refreshing a persisted RDD "Yes, you will have to recreate the streaming Dataframe along with the static Dataframe, and restart the query. There isnt a currently feasible to do this without a query restart. But restarting a query WITHOUT restarting the whole application + spark cluster, is reasonably fast. If your applicatoin can tolerate 10 second latencies, then stopping and restarting a query within the same Spark application is a reasonable solution." [http://mail-archives.apache.org/mod_mbox/spark-user/201705.mbox/browser] [1]: http://SparkMailingList So if thats better solution should i restart query as below query.processAllavaialble() query.stop() df.unpersist() val inputDf: DataFrame = readFile(spec, sparkSession) //read file from S3 or anyother source val recreateddf = inputDf.persist() query.start() when i looked into spark documentation of above methods void processAllAvailable() ///documentation says This method is intended for testing/// Blocks until all available data in the source has been processed and committed to the sink. This method is intended for testing. Note that in the case of continually arriving data, this method may block forever. Additionally, this method is only guaranteed to block until data that has been synchronously appended data to a Source prior to invocation. (i.e. getOffset must immediately reflect the addition). stop() Stops the execution of this query if it is running. This method blocks until the threads performing execution has stopped. https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/sql/streaming/StreamingQuery.html#processAllAvailable() Please suggest a better approach to refresh the cache.
Spark streaming - TIBCO EMS
What is the best way to connect to TIBCO EMS using spark streaming? Do we need to write custom receivers or any libraries already exist. Thanks, Pradeep - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Long Shuffle Read Blocked Time
Hi All, It appears that the bottleneck in my job was the EBS volumes. Very high i/o wait times across the cluster. I was only using 1 volume. Increasing to 4 made it faster. Thanks, Pradeep On Thu, Apr 20, 2017 at 3:12 PM, Pradeep Gollakota <pradeep...@gmail.com> wrote: > Hi All, > > I have a simple ETL job that reads some data, shuffles it and writes it > back out. This is running on AWS EMR 5.4.0 using Spark 2.1.0. > > After Stage 0 completes and the job starts Stage 1, I see a huge slowdown > in the job. The CPU usage is low on the cluster, as is the network I/O. > From the Spark Stats, I see large values for the Shuffle Read Blocked Time. > As an example, one of my tasks completed in 18 minutes, but spent 15 > minutes waiting for remote reads. > > I'm not sure why the shuffle is so slow. Are there things I can do to > increase the performance of the shuffle? > > Thanks, > Pradeep >
Long Shuffle Read Blocked Time
Hi All, I have a simple ETL job that reads some data, shuffles it and writes it back out. This is running on AWS EMR 5.4.0 using Spark 2.1.0. After Stage 0 completes and the job starts Stage 1, I see a huge slowdown in the job. The CPU usage is low on the cluster, as is the network I/O. >From the Spark Stats, I see large values for the Shuffle Read Blocked Time. As an example, one of my tasks completed in 18 minutes, but spent 15 minutes waiting for remote reads. I'm not sure why the shuffle is so slow. Are there things I can do to increase the performance of the shuffle? Thanks, Pradeep
Spark subscribe
Hi , Can you please add me to spark subscription list. Regards Pradeep S
wholeTextFiles()
Hi, Why there is an restriction on max file size that can be read by wholeTextFile() method. I can read a 1.5 gigs file but get Out of memory for 2 gig file. Also, how can I raise this as an defect in spark jira. Can someone please guide. Thanks, Pradeep - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
MLlib to Compute boundaries of a rectangle given random points on its Surface
Hello, Can someone please let me know if it is possible to construct a surface(for example:- Rectangle) given random points on its surface using Spark MLlib? Thanks Pradeep Gaddam This message and any attachments may contain confidential information of View, Inc. If you are not the intended recipient you are hereby notified that any dissemination, copying or distribution of this message, or files associated with this message, is strictly prohibited. If you have received this message in error, please notify us immediately by replying to the message and delete the message from your computer.
Re: Design patterns for Spark implementation
I was hoping for someone to answer this question, As it resonates with many developers who are new to Spark and trying to adopt it at their work. Regards Pradeep On Dec 3, 2016, at 9:00 AM, Vasu Gourabathina <vgour...@gmail.com<mailto:vgour...@gmail.com>> wrote: Hi, I know this is a broad question. If this is not the right forum, appreciate if you can point to other sites/areas that may be helpful. Before posing this question, I did use our friend Google, but sanitizing the query results from my need angle hasn't been easy. Who I am: - Have done data processing and analytics, but relatively new to Spark world What I am looking for: - Architecture/Design of a ML system using Spark - In particular, looking for best practices that can support/bridge both Engineering and Data Science teams Engineering: - Build a system that has typical engineering needs, data processing, scalability, reliability, availability, fault-tolerance etc. - System monitoring etc. Data Science: - Build a system for Data Science team to do data exploration activities - Develop models using supervised learning and tweak models Data: - Batch and incremental updates - mostly structured or semi-structured (some data from transaction systems, weblogs, click stream etc.) - Steaming, in near term, but not to begin with Data Storage: - Data is expected to grow on a daily basis...so, system should be able to support and handle big data - May be, after further analysis, there might be a possibility/need to archive some of the data...it all depends on how the ML models were built and results were stored/used for future usage Data Analysis: - Obvious data related aspects, such as data cleansing, data transformation, data partitioning etc - May be run models on windows of data. For example: last 1-year, 2-years etc. ML models: - Ability to store model versions and previous results - Compare results of different variants of models Consumers: - RESTful webservice clients to look at the results So, the questions I have are: 1) Are there architectural and design patterns that I can use based on industry best-practices. In particular: - data ingestion - data storage (for eg. go with HDFS or not) - data partitioning, especially in Spark world - running parallel ML models and combining results etc. - consumption of final results by clients (for eg. by pushing results to Cassandra, NoSQL dbs etc.) Again, I know this is a broad questionPointers to some best-practices in some of the areas, if not all, would be highly appreciated. Open to purchase any books that may have relevant information. Thanks much folks, Vasu. This message and any attachments may contain confidential information of View, Inc. If you are not the intended recipient you are hereby notified that any dissemination, copying or distribution of this message, or files associated with this message, is strictly prohibited. If you have received this message in error, please notify us immediately by replying to the message and delete the message from your computer.
Kafka message metadata with Dstreams
Hi All, I am using Dstreams to read Kafka topics. While I can read the messages fine, I also want to get metadata on the message such as offset, time it was put to topic etc.. Is there any Java Api to get this info. Thanks, Pradeep - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Log rollover in spark streaming jobs
Hi All, I am running Java spark streaming jobs in yarn-client mode. Is there a way I can manage logs rollover on edge node. I have a 10 second batch and log file volume is huge. Thanks, Pradeep - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Stop Spark Streaming Jobs
Thanks Park. I am doing the same. Was trying to understand if there are other ways. Thanks, Pradeep > On Aug 2, 2016, at 10:25 PM, Park Kyeong Hee <kh1979.p...@samsung.com> wrote: > > So sorry. Your name was Pradeep !! > > -Original Message- > From: Park Kyeong Hee [mailto:kh1979.p...@samsung.com] > Sent: Wednesday, August 03, 2016 11:24 AM > To: 'Pradeep'; 'user@spark.apache.org' > Subject: RE: Stop Spark Streaming Jobs > > Hi. Paradeep > > > Did you mean, how to kill the job? > If yes, you should kill the driver and follow next. > > on yarn-client > 1. find pid - "ps -es | grep " > 2. kill it - "kill -9 " > 3. check executors were down - "yarn application -list" > > on yarn-cluster > 1. find driver's application ID - "yarn application -list" > 2. stop it - "yarn application -kill " > 3. check driver and executors were down - "yarn application -list" > > > Thanks. > > -Original Message- > From: Pradeep [mailto:pradeep.mi...@mail.com] > Sent: Wednesday, August 03, 2016 10:48 AM > To: user@spark.apache.org > Subject: Stop Spark Streaming Jobs > > Hi All, > > My streaming job reads data from Kafka. The job is triggered and pushed to > background with nohup. > > What are the recommended ways to stop job either on yarn-client or cluster > mode. > > Thanks, > Pradeep > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > > > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Stop Spark Streaming Jobs
Hi All, My streaming job reads data from Kafka. The job is triggered and pushed to background with nohup. What are the recommended ways to stop job either on yarn-client or cluster mode. Thanks, Pradeep - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: spark-submit hangs forever after all tasks finish(spark 2.0.0 stable version on yarn)
Hi, Are you running on yarn-client or cluster mode? Pradeep > On Jul 30, 2016, at 7:34 PM, taozhuo <taoz...@gmail.com> wrote: > > below is the error messages that seem run infinitely: > > > 16/07/30 23:25:38 DEBUG ProtobufRpcEngine: Call: getApplicationReport took > 1ms > 16/07/30 23:25:39 DEBUG Client: IPC Client (1735131305) connection to > /10.80.1.168:8032 from zhuotao sending #147247 > 16/07/30 23:25:39 DEBUG Client: IPC Client (1735131305) connection to > /10.80.1.168:8032 from zhuotao got value #147247 > 16/07/30 23:25:39 DEBUG ProtobufRpcEngine: Call: getApplicationReport took > 1ms > 16/07/30 23:25:40 DEBUG Client: IPC Client (1735131305) connection to > /10.80.1.168:8032 from zhuotao sending #147248 > 16/07/30 23:25:40 DEBUG Client: IPC Client (1735131305) connection to > /10.80.1.168:8032 from zhuotao got value #147248 > 16/07/30 23:25:40 DEBUG ProtobufRpcEngine: Call: getApplicationReport took > 1ms > 16/07/30 23:25:41 DEBUG Client: IPC Client (1735131305) connection to > /10.80.1.168:8032 from zhuotao sending #147249 > 16/07/30 23:25:41 DEBUG Client: IPC Client (1735131305) connection to > /10.80.1.168:8032 from zhuotao got value #147249 > 16/07/30 23:25:41 DEBUG ProtobufRpcEngine: Call: getApplicationReport took > 1ms > 16/07/30 23:25:42 DEBUG Client: IPC Client (1735131305) connection to > /10.80.1.168:8032 from zhuotao sending #147250 > 16/07/30 23:25:42 DEBUG Client: IPC Client (1735131305) connection to > /10.80.1.168:8032 from zhuotao got value #147250 > 16/07/30 23:25:42 DEBUG ProtobufRpcEngine: Call: getApplicationReport took > 1ms > 16/07/30 23:25:43 DEBUG Client: IPC Client (1735131305) connection to > /10.80.1.168:8032 from zhuotao sending #147251 > 16/07/30 23:25:43 DEBUG Client: IPC Client (1735131305) connection to > /10.80.1.168:8032 from zhuotao got value #147251 > 16/07/30 23:25:43 DEBUG ProtobufRpcEngine: Call: getApplicationReport took > 1ms > 16/07/30 23:25:44 DEBUG Client: IPC Client (1735131305) connection to > /10.80.1.168:8032 from zhuotao sending #147252 > 16/07/30 23:25:44 DEBUG Client: IPC Client (1735131305) connection to > /10.80.1.168:8032 from zhuotao got value #147252 > 16/07/30 23:25:44 DEBUG ProtobufRpcEngine: Call: getApplicationReport took > 0ms > 16/07/30 23:25:45 DEBUG Client: IPC Client (1735131305) connection to > /10.80.1.168:8032 from zhuotao sending #147253 > 16/07/30 23:25:45 DEBUG Client: IPC Client (1735131305) connection to > /10.80.1.168:8032 from zhuotao got value #147253 > 16/07/30 23:25:45 DEBUG ProtobufRpcEngine: Call: getApplicationReport took > 0ms > 16/07/30 23:25:46 DEBUG Client: IPC Client (1735131305) connection to > /10.80.1.168:8032 from zhuotao sending #147254 > 16/07/30 23:25:46 DEBUG Client: IPC Client (1735131305) connection to > /10.80.1.168:8032 from zhuotao got value #147254 > 16/07/30 23:25:46 DEBUG ProtobufRpcEngine: Call: getApplicationReport took > 1ms > 16/07/30 23:25:47 DEBUG Client: IPC Client (1735131305) connection to > /10.80.1.168:8032 from zhuotao sending #147255 > 16/07/30 23:25:47 DEBUG Client: IPC Client (1735131305) connection to > /10.80.1.168:8032 from zhuotao got value #147255 > 16/07/30 23:25:47 DEBUG ProtobufRpcEngine: Call: getApplicationReport took > 1ms > 16/07/30 23:25:48 DEBUG Client: IPC Client (1735131305) connection to > /10.80.1.168:8032 from zhuotao sending #147256 > 16/07/30 23:25:48 DEBUG Client: IPC Client (1735131305) connection to > /10.80.1.168:8032 from zhuotao got value #147256 > 16/07/30 23:25:48 DEBUG ProtobufRpcEngine: Call: getApplicationReport took > 1ms > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/spark-submit-hangs-forever-after-all-tasks-finish-spark-2-0-0-stable-version-on-yarn-tp27436.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Spark Website
Worked for me if I go to https://spark.apache.org/site/ but not https://spark.apache.org On Wed, Jul 13, 2016 at 11:48 AM, Maurin Lenglartwrote: > Same here > > > > *From: *Benjamin Kim > *Date: *Wednesday, July 13, 2016 at 11:47 AM > *To: *manish ranjan > *Cc: *user > *Subject: *Re: Spark Website > > > > It takes me to the directories instead of the webpage. > > > > On Jul 13, 2016, at 11:45 AM, manish ranjan wrote: > > > > working for me. What do you mean 'as supposed to'? > > > ~Manish > > > > On Wed, Jul 13, 2016 at 11:45 AM, Benjamin Kim wrote: > > Has anyone noticed that the spark.apache.org is not working as supposed > to? > > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > > > >
Re: Spark-submit hangs indefinitely after job completion.
BTW, I am using a 6-node cluster with m4.2xlarge machines on amazon. I have tried with both yarn-cluster and spark's native cluster mode as well. On Tue, May 24, 2016 at 12:10 PM Mathieu Longtin <math...@closetwork.org> wrote: > I have been seeing the same behavior in standalone with a master. > > > On Tue, May 24, 2016 at 3:08 PM Pradeep Nayak <pradeep1...@gmail.com> > wrote: > >> >> >> I have posted the same question of Stack Overflow: >> http://stackoverflow.com/questions/37421852/spark-submit-continues-to-hang-after-job-completion >> >> I am trying to test spark 1.6 with hdfs in AWS. I am using the wordcount >> python example available in the examples folder. I submit the job with >> spark-submit, the job completes successfully and its prints the results on >> the console as well. The web-UI also says its completed. However the >> spark-submit never terminates. I have verified that the context is stopped >> in the word count example code as well. >> >> What could be wrong ? >> >> This is what I see on the console. >> >> >> 6-05-24 14:58:04,749 INFO [Thread-3] handler.ContextHandler >> (ContextHandler.java:doStop(843)) - stopped >> o.s.j.s.ServletContextHandler{/stages/stage,null}2016-05-24 14:58:04,749 >> INFO [Thread-3] handler.ContextHandler (ContextHandler.java:doStop(843)) - >> stopped o.s.j.s.ServletContextHandler{/stages/json,null}2016-05-24 >> 14:58:04,749 INFO [Thread-3] handler.ContextHandler >> (ContextHandler.java:doStop(843)) - stopped >> o.s.j.s.ServletContextHandler{/stages,null}2016-05-24 14:58:04,749 INFO >> [Thread-3] handler.ContextHandler (ContextHandler.java:doStop(843)) - >> stopped o.s.j.s.ServletContextHandler{/jobs/job/json,null}2016-05-24 >> 14:58:04,750 INFO [Thread-3] handler.ContextHandler >> (ContextHandler.java:doStop(843)) - stopped >> o.s.j.s.ServletContextHandler{/jobs/job,null}2016-05-24 14:58:04,750 INFO >> [Thread-3] handler.ContextHandler (ContextHandler.java:doStop(843)) - >> stopped o.s.j.s.ServletContextHandler{/jobs/json,null}2016-05-24 >> 14:58:04,750 INFO [Thread-3] handler.ContextHandler >> (ContextHandler.java:doStop(843)) - stopped >> o.s.j.s.ServletContextHandler{/jobs,null}2016-05-24 14:58:04,802 INFO >> [Thread-3] ui.SparkUI (Logging.scala:logInfo(58)) - Stopped Spark web UI at >> http://172.30.2.239:40402016-05-24 14:58:04,805 INFO [Thread-3] >> cluster.SparkDeploySchedulerBackend (Logging.scala:logInfo(58)) - Shutting >> down all executors2016-05-24 14:58:04,805 INFO [dispatcher-event-loop-2] >> cluster.SparkDeploySchedulerBackend (Logging.scala:logInfo(58)) - Asking >> each executor to shut down2016-05-24 14:58:04,814 INFO >> [dispatcher-event-loop-5] spark.MapOutputTrackerMasterEndpoint >> (Logging.scala:logInfo(58)) - MapOutputTrackerMasterEndpoint >> stopped!2016-05-24 14:58:04,818 INFO [Thread-3] storage.MemoryStore >> (Logging.scala:logInfo(58)) - MemoryStore cleared2016-05-24 14:58:04,818 >> INFO [Thread-3] storage.BlockManager (Logging.scala:logInfo(58)) - >> BlockManager stopped2016-05-24 14:58:04,820 INFO [Thread-3] >> storage.BlockManagerMaster (Logging.scala:logInfo(58)) - BlockManagerMaster >> stopped2016-05-24 14:58:04,821 INFO [dispatcher-event-loop-3] >> scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint >> (Logging.scala:logInfo(58)) - OutputCommitCoordinator stopped!2016-05-24 >> 14:58:04,824 INFO [Thread-3] spark.SparkContext (Logging.scala:logInfo(58)) >> - Successfully stopped SparkContext2016-05-24 14:58:04,827 INFO >> [sparkDriverActorSystem-akka.actor.default-dispatcher-2] >> remote.RemoteActorRefProvider$RemotingTerminator >> (Slf4jLogger.scala:apply$mcV$sp(74)) - Shutting down remote >> daemon.2016-05-24 14:58:04,828 INFO >> [sparkDriverActorSystem-akka.actor.default-dispatcher-2] >> remote.RemoteActorRefProvider$RemotingTerminator >> (Slf4jLogger.scala:apply$mcV$sp(74)) - Remote daemon shut down; proceeding >> with flushing remote transports.2016-05-24 14:58:04,843 INFO >> [sparkDriverActorSystem-akka.actor.default-dispatcher-2] >> remote.RemoteActorRefProvider$RemotingTerminator >> (Slf4jLogger.scala:apply$mcV$sp(74)) - Remoting shut down. >> >> >> I have to do a ctrl-c to terminate the spark-submit process. This is >> really a weird problem and I have no idea how to fix this. Please let me >> know if there are any logs I should be looking at, or doing things >> differently here. >> >> >> -- > Mathieu Longtin > 1-514-803-8977 >
Spark-submit hangs indefinitely after job completion.
I have posted the same question of Stack Overflow: http://stackoverflow.com/questions/37421852/spark-submit-continues-to-hang-after-job-completion I am trying to test spark 1.6 with hdfs in AWS. I am using the wordcount python example available in the examples folder. I submit the job with spark-submit, the job completes successfully and its prints the results on the console as well. The web-UI also says its completed. However the spark-submit never terminates. I have verified that the context is stopped in the word count example code as well. What could be wrong ? This is what I see on the console. 6-05-24 14:58:04,749 INFO [Thread-3] handler.ContextHandler (ContextHandler.java:doStop(843)) - stopped o.s.j.s.ServletContextHandler{/stages/stage,null}2016-05-24 14:58:04,749 INFO [Thread-3] handler.ContextHandler (ContextHandler.java:doStop(843)) - stopped o.s.j.s.ServletContextHandler{/stages/json,null}2016-05-24 14:58:04,749 INFO [Thread-3] handler.ContextHandler (ContextHandler.java:doStop(843)) - stopped o.s.j.s.ServletContextHandler{/stages,null}2016-05-24 14:58:04,749 INFO [Thread-3] handler.ContextHandler (ContextHandler.java:doStop(843)) - stopped o.s.j.s.ServletContextHandler{/jobs/job/json,null}2016-05-24 14:58:04,750 INFO [Thread-3] handler.ContextHandler (ContextHandler.java:doStop(843)) - stopped o.s.j.s.ServletContextHandler{/jobs/job,null}2016-05-24 14:58:04,750 INFO [Thread-3] handler.ContextHandler (ContextHandler.java:doStop(843)) - stopped o.s.j.s.ServletContextHandler{/jobs/json,null}2016-05-24 14:58:04,750 INFO [Thread-3] handler.ContextHandler (ContextHandler.java:doStop(843)) - stopped o.s.j.s.ServletContextHandler{/jobs,null}2016-05-24 14:58:04,802 INFO [Thread-3] ui.SparkUI (Logging.scala:logInfo(58)) - Stopped Spark web UI at http://172.30.2.239:40402016-05-24 14:58:04,805 INFO [Thread-3] cluster.SparkDeploySchedulerBackend (Logging.scala:logInfo(58)) - Shutting down all executors2016-05-24 14:58:04,805 INFO [dispatcher-event-loop-2] cluster.SparkDeploySchedulerBackend (Logging.scala:logInfo(58)) - Asking each executor to shut down2016-05-24 14:58:04,814 INFO [dispatcher-event-loop-5] spark.MapOutputTrackerMasterEndpoint (Logging.scala:logInfo(58)) - MapOutputTrackerMasterEndpoint stopped!2016-05-24 14:58:04,818 INFO [Thread-3] storage.MemoryStore (Logging.scala:logInfo(58)) - MemoryStore cleared2016-05-24 14:58:04,818 INFO [Thread-3] storage.BlockManager (Logging.scala:logInfo(58)) - BlockManager stopped2016-05-24 14:58:04,820 INFO [Thread-3] storage.BlockManagerMaster (Logging.scala:logInfo(58)) - BlockManagerMaster stopped2016-05-24 14:58:04,821 INFO [dispatcher-event-loop-3] scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint (Logging.scala:logInfo(58)) - OutputCommitCoordinator stopped!2016-05-24 14:58:04,824 INFO [Thread-3] spark.SparkContext (Logging.scala:logInfo(58)) - Successfully stopped SparkContext2016-05-24 14:58:04,827 INFO [sparkDriverActorSystem-akka.actor.default-dispatcher-2] remote.RemoteActorRefProvider$RemotingTerminator (Slf4jLogger.scala:apply$mcV$sp(74)) - Shutting down remote daemon.2016-05-24 14:58:04,828 INFO [sparkDriverActorSystem-akka.actor.default-dispatcher-2] remote.RemoteActorRefProvider$RemotingTerminator (Slf4jLogger.scala:apply$mcV$sp(74)) - Remote daemon shut down; proceeding with flushing remote transports.2016-05-24 14:58:04,843 INFO [sparkDriverActorSystem-akka.actor.default-dispatcher-2] remote.RemoteActorRefProvider$RemotingTerminator (Slf4jLogger.scala:apply$mcV$sp(74)) - Remoting shut down. I have to do a ctrl-c to terminate the spark-submit process. This is really a weird problem and I have no idea how to fix this. Please let me know if there are any logs I should be looking at, or doing things differently here.
Is this possible to do in spark ?
Hi - I have a very unique problem which I am trying to solve and I am not sure if spark would help here. I have a directory: /X/Y/a.txt and in the same structure /X/Y/Z/b.txt. a.txt contains a unique serial number, say: 12345 and b.txt contains key value pairs. a,1 b,1, c,0 etc. Everyday you receive data for a system Y. so there are multiple a.txt and b.txt for a serial number. The serial number doesn't change and that the key. So there are multiple systems and the data of a whole year is available and its huge. I am trying to generate a report of unique serial numbers where the value of the option a has changed to 1 over the last few months. Lets say the default is 0. Also figure how many times it was toggled. I am not sure how to read two text files in spark at the same time and associated them with the serial number. Is there a way of doing this in place given that we know the directory structure ? OR we should be transforming the data anyway to solve this ?
how does sc.textFile translate regex in the input.
I am trying to understand on how spark's sc.textFile() works. I specifically have the question on how it translates the paths with regex in it. For example: files = sc.textFile("hdfs://:/file1/*/*/*/*.txt") How does it find all the sub-directories and recurses to all the leaf files. ? Is there any documentation on how this happens ? Pradeep
Re: Why there's no api for SparkContext#textFiles to support multiple inputs ?
IIRC, TextInputFormat supports an input path that is a comma separated list. I haven't tried this, but I think you should just be able to do sc.textFile("file1,file2,...") On Wed, Nov 11, 2015 at 4:30 PM, Jeff Zhangwrote: > I know these workaround, but wouldn't it be more convenient and > straightforward to use SparkContext#textFiles ? > > On Thu, Nov 12, 2015 at 2:27 AM, Mark Hamstra > wrote: > >> For more than a small number of files, you'd be better off using >> SparkContext#union instead of RDD#union. That will avoid building up a >> lengthy lineage. >> >> On Wed, Nov 11, 2015 at 10:21 AM, Jakob Odersky >> wrote: >> >>> Hey Jeff, >>> Do you mean reading from multiple text files? In that case, as a >>> workaround, you can use the RDD#union() (or ++) method to concatenate >>> multiple rdds. For example: >>> >>> val lines1 = sc.textFile("file1") >>> val lines2 = sc.textFile("file2") >>> >>> val rdd = lines1 union lines2 >>> >>> regards, >>> --Jakob >>> >>> On 11 November 2015 at 01:20, Jeff Zhang wrote: >>> Although user can use the hdfs glob syntax to support multiple inputs. But sometimes, it is not convenient to do that. Not sure why there's no api of SparkContext#textFiles. It should be easy to implement that. I'd love to create a ticket and contribute for that if there's no other consideration that I don't know. -- Best Regards Jeff Zhang >>> >>> >> > > > -- > Best Regards > > Jeff Zhang >
Re: Why there's no api for SparkContext#textFiles to support multiple inputs ?
Looks like what I was suggesting doesn't work. :/ On Wed, Nov 11, 2015 at 4:49 PM, Jeff Zhang <zjf...@gmail.com> wrote: > Yes, that's what I suggest. TextInputFormat support multiple inputs. So in > spark side, we just need to provide API to for that. > > On Thu, Nov 12, 2015 at 8:45 AM, Pradeep Gollakota <pradeep...@gmail.com> > wrote: > >> IIRC, TextInputFormat supports an input path that is a comma separated >> list. I haven't tried this, but I think you should just be able to do >> sc.textFile("file1,file2,...") >> >> On Wed, Nov 11, 2015 at 4:30 PM, Jeff Zhang <zjf...@gmail.com> wrote: >> >>> I know these workaround, but wouldn't it be more convenient and >>> straightforward to use SparkContext#textFiles ? >>> >>> On Thu, Nov 12, 2015 at 2:27 AM, Mark Hamstra <m...@clearstorydata.com> >>> wrote: >>> >>>> For more than a small number of files, you'd be better off using >>>> SparkContext#union instead of RDD#union. That will avoid building up a >>>> lengthy lineage. >>>> >>>> On Wed, Nov 11, 2015 at 10:21 AM, Jakob Odersky <joder...@gmail.com> >>>> wrote: >>>> >>>>> Hey Jeff, >>>>> Do you mean reading from multiple text files? In that case, as a >>>>> workaround, you can use the RDD#union() (or ++) method to concatenate >>>>> multiple rdds. For example: >>>>> >>>>> val lines1 = sc.textFile("file1") >>>>> val lines2 = sc.textFile("file2") >>>>> >>>>> val rdd = lines1 union lines2 >>>>> >>>>> regards, >>>>> --Jakob >>>>> >>>>> On 11 November 2015 at 01:20, Jeff Zhang <zjf...@gmail.com> wrote: >>>>> >>>>>> Although user can use the hdfs glob syntax to support multiple >>>>>> inputs. But sometimes, it is not convenient to do that. Not sure why >>>>>> there's no api of SparkContext#textFiles. It should be easy to implement >>>>>> that. I'd love to create a ticket and contribute for that if there's no >>>>>> other consideration that I don't know. >>>>>> >>>>>> -- >>>>>> Best Regards >>>>>> >>>>>> Jeff Zhang >>>>>> >>>>> >>>>> >>>> >>> >>> >>> -- >>> Best Regards >>> >>> Jeff Zhang >>> >> >> > > > -- > Best Regards > > Jeff Zhang >
Re: [SparkR] Missing Spark APIs in R
Thanks Shivaram. I watched your talk and the plan to use ML APIs with R flavor looks exciting. Is there a different venue where I would be able to follow the SparkR API progress? Thanks Pradeep On Mon, Jun 29, 2015 at 1:12 PM, Shivaram Venkataraman shiva...@eecs.berkeley.edu wrote: The RDD API is pretty complex and we are not yet sure we want to export all those methods in the SparkR API. We are working towards exposing a more limited API in upcoming versions. You can find some more details in the recent Spark Summit talk at https://spark-summit.org/2015/events/sparkr-the-past-the-present-and-the-future/ Thanks Shivaram On Mon, Jun 29, 2015 at 9:40 AM, Pradeep Bashyal prad...@bashyal.com wrote: Hello, I noticed that some of the spark-core APIs are not available with version 1.4.0 release of SparkR. For example textFile(), flatMap() etc. The code seems to be there but is not exported in NAMESPACE. They were all available as part of the AmpLab Extras previously. I wasn't able to find any explanations of why they were not included with the release. Can anyone shed some light on it? Thanks Pradeep
[SparkR] Missing Spark APIs in R
Hello, I noticed that some of the spark-core APIs are not available with version 1.4.0 release of SparkR. For example textFile(), flatMap() etc. The code seems to be there but is not exported in NAMESPACE. They were all available as part of the AmpLab Extras previously. I wasn't able to find any explanations of why they were not included with the release. Can anyone shed some light on it? Thanks Pradeep
Re: ClassCastException when calling updateStateKey
Hi Marcelo, I am not including Spark's classes. When I used the userClasspathFirst flag, I started getting those errors. Been there, done that. Removing guava classes was one of the first things I tried. I saw your replies to a similar problem from Sept. http://apache-spark-developers-list.1001551.n3.nabble.com/guava-version-conflicts-td8480.html It looks like my issue is the same cause, but different symptoms. Thanks, Pradeep. On Fri, Apr 10, 2015 at 12:51 PM, Marcelo Vanzin van...@cloudera.com wrote: On Fri, Apr 10, 2015 at 10:11 AM, Pradeep Rai prai...@gmail.com wrote: I tried the userClasspathFirst flag. I started getting NoClassDeFound Exception for spark classes like Function2, etc. Wait. Are you including Spark classes in your app's assembly? Don't do that... As for Guava, yeah, the mess around Optional and friends is unfortunate. One way you could try to work around it, if excluding Spark classes and the userClassPathFirst option doesn't work, is to explicitly remove the Optional (and related) classes from your app's fat jar, and cross your fingers. -- Marcelo
How to set hadoop native library path in spark-1.1
Hi all, Can anyone tell me how to set the native library path in Spark. Right not I am setting it using SPARK_LIBRARY_PATH environmental variable in spark-env.sh. But still no success. I am still seeing this in spark-shell. NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Thanks, Pradeep
Re: Spark packaging
Thanks Prabeesh. On Wed, Apr 9, 2014 at 12:37 AM, prabeesh k prabsma...@gmail.com wrote: Please refer http://prabstechblog.blogspot.in/2014/04/creating-single-jar-for-spark-project.html Regards, prabeesh On Wed, Apr 9, 2014 at 1:04 PM, Pradeep baji pradeep.chanum...@gmail.comwrote: Hi all, I am new to spark and trying to learn it. Is there any document which describes how spark is packaged. ( like dependencies needed to build spark, which jar contains what after build etc) Thanks for the help. Regards, Pradeep
Multi master Spark
Hi, I want to enable Spark Master HA in spark. Documentation specifies that we can do this with the help of Zookeepers. But what I am worried is how to configure one master with the other and similarly how do workers know that the have two masters? where do you specify the multi-master information? Thanks for the help. Thanks, Pradeep
Re: Multi master Spark
Thanks Dmitriy. But I want multi master support when running spark standalone. Also I want to know if this multi master thing works if I use spark-shell. On Wed, Apr 9, 2014 at 3:26 PM, Dmitriy Lyubimov dlie...@gmail.com wrote: The only way i know to do this is to use mesos with zookeepers. you specify zookeeper url as spark url that contains multiple zookeeper hosts. Multiple mesos masters are then elected thru zookeeper leader election until current leader dies; at which point mesos will elect another master (if still left). iirc, in this mode spark master never runs, only master slaves are being spun by mesos slaves directly. On Wed, Apr 9, 2014 at 3:08 PM, Pradeep Ch pradeep.chanum...@gmail.comwrote: Hi, I want to enable Spark Master HA in spark. Documentation specifies that we can do this with the help of Zookeepers. But what I am worried is how to configure one master with the other and similarly how do workers know that the have two masters? where do you specify the multi-master information? Thanks for the help. Thanks, Pradeep