Dynamic executor scaling spark/Kubernetes

2019-04-16 Thread purna pradeep
Hello,

Is Kubernetes Dynamic executor scaling for spark  is available in latest
release of spark

I mean scaling the executors based on the work load vs preallocating number
of executors for a spark job

Thanks,
Purna


Re: [ANNOUNCE] Announcing Apache Spark 2.4.0

2018-11-09 Thread purna pradeep
Thanks this is a great news

Can you please lemme if dynamic resource allocation is available in spark
2.4?

I’m using spark 2.3.2 on Kubernetes, do I still need to provide executor
memory options as part of spark submit command or spark will manage
required executor memory based on the spark job size ?

On Thu, Nov 8, 2018 at 2:18 PM Marcelo Vanzin 
wrote:

> +user@
>
> >> -- Forwarded message -
> >> From: Wenchen Fan 
> >> Date: Thu, Nov 8, 2018 at 10:55 PM
> >> Subject: [ANNOUNCE] Announcing Apache Spark 2.4.0
> >> To: Spark dev list 
> >>
> >>
> >> Hi all,
> >>
> >> Apache Spark 2.4.0 is the fifth release in the 2.x line. This release
> adds Barrier Execution Mode for better integration with deep learning
> frameworks, introduces 30+ built-in and higher-order functions to deal with
> complex data type easier, improves the K8s integration, along with
> experimental Scala 2.12 support. Other major updates include the built-in
> Avro data source, Image data source, flexible streaming sinks, elimination
> of the 2GB block size limitation during transfer, Pandas UDF improvements.
> In addition, this release continues to focus on usability, stability, and
> polish while resolving around 1100 tickets.
> >>
> >> We'd like to thank our contributors and users for their contributions
> and early feedback to this release. This release would not have been
> possible without you.
> >>
> >> To download Spark 2.4.0, head over to the download page:
> http://spark.apache.org/downloads.html
> >>
> >> To view the release notes:
> https://spark.apache.org/releases/spark-release-2-4-0.html
> >>
> >> Thanks,
> >> Wenchen
> >>
> >> PS: If you see any issues with the release notes, webpage or published
> artifacts, please contact me directly off-list.
>
>
>
> --
> Marcelo
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Spark 2.3.1: k8s driver pods stuck in Initializing state

2018-09-26 Thread purna pradeep
Hello ,


We're running spark 2.3.1 on kubernetes v1.11.0 and our driver pods from
k8s are getting stuck in initializing state like so:

NAME
READY STATUS RESTARTS   AGE

my-pod-fd79926b819d3b34b05250e23347d0e7-driver   0/1   Init:0/1   0
  18h


And from *kubectl describe pod*:

*Warning  FailedMount  9m (x128 over 4h) * kubelet, 10.47.96.167  Unable to
mount volumes for pod
"my-pod-fd79926b819d3b34b05250e23347d0e7-driver_spark(1f3aba7b-c10f-11e8-bcec-1292fec79aba)":
timeout expired waiting for volumes to attach or mount for pod
"spark"/"my-pod-fd79926b819d3b34b05250e23347d0e7-driver". list of unmounted
volumes=[spark-init-properties]. list of unattached
volumes=[spark-init-properties download-jars-volume download-files-volume
spark-token-tfpvp]
  *Warning  FailedMount  4m (x153 over 4h)  kubelet,* 10.47.96.167
MountVolume.SetUp failed for volume "spark-init-properties" : configmaps
"my-pod-fd79926b819d3b34b05250e23347d0e7-init-config" not found

>From what I can see in *kubectl get configmap* the init config map for the
driver pod isn't there.

Am I correct in assuming since the configmap isn't being created the driver
pod will never start (hence stuck in init)?

Where does the init config map come from?

Why would it not be created?


Please suggest

Thanks,
Purna


Spark 2.3.1: k8s driver pods stuck in Initializing state

2018-09-26 Thread Purna Pradeep Mamillapalli
We're running spark 2.3.1 on kubernetes v1.11.0 and our driver pods from
k8s are getting stuck in initializing state like so:

NAME
READY STATUS RESTARTS   AGE

my-pod-fd79926b819d3b34b05250e23347d0e7-driver   0/1   Init:0/1   0
  18h


And from *kubectl describe pod*:

*Warning  FailedMount  9m (x128 over 4h) * kubelet, 10.47.96.167  Unable to
mount volumes for pod
"my-pod-fd79926b819d3b34b05250e23347d0e7-driver_spark(1f3aba7b-c10f-11e8-bcec-1292fec79aba)":
timeout expired waiting for volumes to attach or mount for pod
"spark"/"my-pod-fd79926b819d3b34b05250e23347d0e7-driver". list of unmounted
volumes=[spark-init-properties]. list of unattached
volumes=[spark-init-properties download-jars-volume download-files-volume
spark-token-tfpvp]
  *Warning  FailedMount  4m (x153 over 4h)  kubelet,* 10.47.96.167
MountVolume.SetUp failed for volume "spark-init-properties" : configmaps
"my-pod-fd79926b819d3b34b05250e23347d0e7-init-config" not found

From what I can see in *kubectl get configmap* the init config map for the
driver pod isn't there.

Am I correct in assuming since the configmap isn't being created the driver
pod will never start (hence stuck in init)?

Where does the init config map come from?

Why would it not be created?

Thanks,
Christopher Carney


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: spark driver pod stuck in Waiting: PodInitializing state in Kubernetes

2018-08-17 Thread purna pradeep
Resurfacing The question to get more attention

Hello,
>
> im running Spark 2.3 job on kubernetes cluster
>>
>> kubectl version
>>
>> Client Version: version.Info{Major:"1", Minor:"9",
>> GitVersion:"v1.9.3", GitCommit:"d2835416544f298c919e2ead3be3d0864b52323b",
>> GitTreeState:"clean", BuildDate:"2018-02-09T21:51:06Z",
>> GoVersion:"go1.9.4", Compiler:"gc", Platform:"darwin/amd64"}
>>
>> Server Version: version.Info{Major:"1", Minor:"8",
>> GitVersion:"v1.8.3", GitCommit:"f0efb3cb883751c5ffdbe6d515f3cb4fbe7b7acd",
>> GitTreeState:"clean", BuildDate:"2017-11-08T18:27:48Z",
>> GoVersion:"go1.8.3", Compiler:"gc", Platform:"linux/amd64"}
>>
>>
>>
>> when i ran spark submit on k8s master the driverpod is stuck in Waiting:
>> PodInitializing state.
>> I had to manually kill the driver pod and submit new job in this case
>> ,then it works.How this can be handled in production ?
>>
> This happens with executor pods as well
>

https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-25128
>
>
>>
>> This is happening if i submit the jobs almost parallel ie submit 5 jobs
>> one after the other simultaneously.
>>
>> I'm running spark jobs on 20 nodes each having below configuration
>>
>> I tried kubectl describe node on the node where trhe driver pod is
>> running this is what i got ,i do see there is overcommit on resources but i
>> expected kubernetes scheduler not to schedule if resources in node are
>> overcommitted or node is in Not Ready state ,in this case node is in Ready
>> State but i observe same behaviour if node is in "Not Ready" state
>>
>>
>>
>> Name:   **
>>
>> Roles:  worker
>>
>> Labels: beta.kubernetes.io/arch=amd64
>>
>> beta.kubernetes.io/os=linux
>>
>> kubernetes.io/hostname=
>>
>> node-role.kubernetes.io/worker=true
>>
>> Annotations:node.alpha.kubernetes.io/ttl=0
>>
>>
>> volumes.kubernetes.io/controller-managed-attach-detach=true
>>
>> Taints: 
>>
>> CreationTimestamp:  Tue, 31 Jul 2018 09:59:24 -0400
>>
>> Conditions:
>>
>>   Type Status  LastHeartbeatTime
>> LastTransitionTimeReason   Message
>>
>>    --  -
>> ----   ---
>>
>>   OutOfDiskFalse   Tue, 14 Aug 2018 09:31:20 -0400   Tue, 31
>> Jul 2018 09:59:24 -0400   KubeletHasSufficientDisk kubelet has
>> sufficient disk space available
>>
>>   MemoryPressure   False   Tue, 14 Aug 2018 09:31:20 -0400   Tue, 31
>> Jul 2018 09:59:24 -0400   KubeletHasSufficientMemory   kubelet has
>> sufficient memory available
>>
>>   DiskPressure False   Tue, 14 Aug 2018 09:31:20 -0400   Tue, 31
>> Jul 2018 09:59:24 -0400   KubeletHasNoDiskPressure kubelet has no disk
>> pressure
>>
>>   ReadyTrueTue, 14 Aug 2018 09:31:20 -0400   Sat, 11
>> Aug 2018 00:41:27 -0400   KubeletReady kubelet is posting
>> ready status. AppArmor enabled
>>
>> Addresses:
>>
>>   InternalIP:  *
>>
>>   Hostname:**
>>
>> Capacity:
>>
>>  cpu: 16
>>
>>  memory:  125827288Ki
>>
>>  pods:110
>>
>> Allocatable:
>>
>>  cpu: 16
>>
>>  memory:  125724888Ki
>>
>>  pods:110
>>
>> System Info:
>>
>>  Machine ID: *
>>
>>  System UUID:**
>>
>>  Boot ID:1493028d-0a80-4f2f-b0f1-48d9b8910e9f
>>
>>  Kernel Version: 4.4.0-1062-aws
>>
>>  OS Image:   Ubuntu 16.04.4 LTS
>>
>>  Operating System:   linux
>>
>>  Architecture:   amd64
>>
>>  Container Runtime Version:  docker://Unknown
>>
>>  Kubelet Version:v1.8.3
>>
>>  Kube-Proxy Version: v1.8.3
>>
>> PodCIDR: **
>>
>> ExternalID:  **
>>
>> Non-terminated Pods: (11 in total)
>>
>>   Namespace  Name
>>CPU Requests  CPU Limits  Memory Requests  Memory
>> Limits
>>
>>   -  
>>  --  ---
>>  -
>>
>>   kube-systemcalico-node-gj5mb
>> 250m (1%) 0 (0%)  0 (0%)   0 (0%)
>>
>>   kube-system
>>  kube-proxy- 100m (0%)
>> 0 (0%)  0 (0%)   0 (0%)
>>
>>   kube-system
>>  prometheus-prometheus-node-exporter-9cntq   100m (0%)
>> 200m (1%)   30Mi (0%)50Mi (0%)
>>
>>   logging
>>  elasticsearch-elasticsearch-data-69df997486-gqcwg   400m (2%)
>> 1 (6%)  8Gi (6%) 16Gi (13%)
>>
>>   logging

Re: spark driver pod stuck in Waiting: PodInitializing state in Kubernetes

2018-08-16 Thread purna pradeep
Hello,

im running Spark 2.3 job on kubernetes cluster
>
> kubectl version
>
> Client Version: version.Info{Major:"1", Minor:"9",
> GitVersion:"v1.9.3", GitCommit:"d2835416544f298c919e2ead3be3d0864b52323b",
> GitTreeState:"clean", BuildDate:"2018-02-09T21:51:06Z",
> GoVersion:"go1.9.4", Compiler:"gc", Platform:"darwin/amd64"}
>
> Server Version: version.Info{Major:"1", Minor:"8",
> GitVersion:"v1.8.3", GitCommit:"f0efb3cb883751c5ffdbe6d515f3cb4fbe7b7acd",
> GitTreeState:"clean", BuildDate:"2017-11-08T18:27:48Z",
> GoVersion:"go1.8.3", Compiler:"gc", Platform:"linux/amd64"}
>
>
>
> when i ran spark submit on k8s master the driver pod is stuck in Waiting:
> PodInitializing state.
> I had to manually kill the driver pod and submit new job in this case
> ,then it works.How this can be handled in production ?
>

https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-25128


>
> This is happening if i submit the jobs almost parallel ie submit 5 jobs
> one after the other simultaneously.
>
> I'm running spark jobs on 20 nodes each having below configuration
>
> I tried kubectl describe node on the node where trhe driver pod is running
> this is what i got ,i do see there is overcommit on resources but i
> expected kubernetes scheduler not to schedule if resources in node are
> overcommitted or node is in Not Ready state ,in this case node is in Ready
> State but i observe same behaviour if node is in "Not Ready" state
>
>
>
> Name:   **
>
> Roles:  worker
>
> Labels: beta.kubernetes.io/arch=amd64
>
> beta.kubernetes.io/os=linux
>
> kubernetes.io/hostname=
>
> node-role.kubernetes.io/worker=true
>
> Annotations:node.alpha.kubernetes.io/ttl=0
>
>
> volumes.kubernetes.io/controller-managed-attach-detach=true
>
> Taints: 
>
> CreationTimestamp:  Tue, 31 Jul 2018 09:59:24 -0400
>
> Conditions:
>
>   Type Status  LastHeartbeatTime
> LastTransitionTimeReason   Message
>
>    --  -
> ----   ---
>
>   OutOfDiskFalse   Tue, 14 Aug 2018 09:31:20 -0400   Tue, 31
> Jul 2018 09:59:24 -0400   KubeletHasSufficientDisk kubelet has
> sufficient disk space available
>
>   MemoryPressure   False   Tue, 14 Aug 2018 09:31:20 -0400   Tue, 31
> Jul 2018 09:59:24 -0400   KubeletHasSufficientMemory   kubelet has
> sufficient memory available
>
>   DiskPressure False   Tue, 14 Aug 2018 09:31:20 -0400   Tue, 31
> Jul 2018 09:59:24 -0400   KubeletHasNoDiskPressure kubelet has no disk
> pressure
>
>   ReadyTrueTue, 14 Aug 2018 09:31:20 -0400   Sat, 11
> Aug 2018 00:41:27 -0400   KubeletReady kubelet is posting
> ready status. AppArmor enabled
>
> Addresses:
>
>   InternalIP:  *
>
>   Hostname:**
>
> Capacity:
>
>  cpu: 16
>
>  memory:  125827288Ki
>
>  pods:110
>
> Allocatable:
>
>  cpu: 16
>
>  memory:  125724888Ki
>
>  pods:110
>
> System Info:
>
>  Machine ID: *
>
>  System UUID:**
>
>  Boot ID:1493028d-0a80-4f2f-b0f1-48d9b8910e9f
>
>  Kernel Version: 4.4.0-1062-aws
>
>  OS Image:   Ubuntu 16.04.4 LTS
>
>  Operating System:   linux
>
>  Architecture:   amd64
>
>  Container Runtime Version:  docker://Unknown
>
>  Kubelet Version:v1.8.3
>
>  Kube-Proxy Version: v1.8.3
>
> PodCIDR: **
>
> ExternalID:  **
>
> Non-terminated Pods: (11 in total)
>
>   Namespace  Name
>CPU Requests  CPU Limits  Memory Requests  Memory
> Limits
>
>   -  
>  --  ---
>  -
>
>   kube-systemcalico-node-gj5mb
>   250m (1%) 0 (0%)  0 (0%)   0 (0%)
>
>   kube-system
>  kube-proxy- 100m (0%)
> 0 (0%)  0 (0%)   0 (0%)
>
>   kube-systemprometheus-prometheus-node-exporter-9cntq
>   100m (0%) 200m (1%)   30Mi (0%)50Mi (0%)
>
>   logging
>  elasticsearch-elasticsearch-data-69df997486-gqcwg   400m (2%)
> 1 (6%)  8Gi (6%) 16Gi (13%)
>
>   loggingfluentd-fluentd-elasticsearch-tj7nd
>   200m (1%) 0 (0%)  612Mi (0%)   0 (0%)
>
>   rook   rook-agent-6jtzm
>0 (0%)0 (0%)  0 (0%)   0 (0%)

spark driver pod stuck in Waiting: PodInitializing state in Kubernetes

2018-08-15 Thread purna pradeep
im running Spark 2.3 job on kubernetes cluster

kubectl version

Client Version: version.Info{Major:"1", Minor:"9", GitVersion:"v1.9.3",
GitCommit:"d2835416544f298c919e2ead3be3d0864b52323b", GitTreeState:"clean",
BuildDate:"2018-02-09T21:51:06Z", GoVersion:"go1.9.4", Compiler:"gc",
Platform:"darwin/amd64"}

Server Version: version.Info{Major:"1", Minor:"8", GitVersion:"v1.8.3",
GitCommit:"f0efb3cb883751c5ffdbe6d515f3cb4fbe7b7acd", GitTreeState:"clean",
BuildDate:"2017-11-08T18:27:48Z", GoVersion:"go1.8.3", Compiler:"gc",
Platform:"linux/amd64"}



when i ran spark submit on k8s master the driver pod is stuck in Waiting:
PodInitializing state.
I had to manually kill the driver pod and submit new job in this case ,then
it works.


This is happening if i submit the jobs almost parallel ie submit 5 jobs one
after the other simultaneously.

I'm running spark jobs on 20 nodes each having below configuration

I tried kubectl describe node on the node where trhe driver pod is running
this is what i got ,i do see there is overcommit on resources but i
expected kubernetes scheduler not to schedule if resources in node are
overcommitted or node is in Not Ready state ,in this case node is in Ready
State but i observe same behaviour if node is in "Not Ready" state



Name:   **

Roles:  worker

Labels: beta.kubernetes.io/arch=amd64

beta.kubernetes.io/os=linux

kubernetes.io/hostname=

node-role.kubernetes.io/worker=true

Annotations:node.alpha.kubernetes.io/ttl=0


volumes.kubernetes.io/controller-managed-attach-detach=true

Taints: 

CreationTimestamp:  Tue, 31 Jul 2018 09:59:24 -0400

Conditions:

  Type Status  LastHeartbeatTime
LastTransitionTimeReason   Message

   --  -
----   ---

  OutOfDiskFalse   Tue, 14 Aug 2018 09:31:20 -0400   Tue, 31
Jul 2018 09:59:24 -0400   KubeletHasSufficientDisk kubelet has
sufficient disk space available

  MemoryPressure   False   Tue, 14 Aug 2018 09:31:20 -0400   Tue, 31
Jul 2018 09:59:24 -0400   KubeletHasSufficientMemory   kubelet has
sufficient memory available

  DiskPressure False   Tue, 14 Aug 2018 09:31:20 -0400   Tue, 31
Jul 2018 09:59:24 -0400   KubeletHasNoDiskPressure kubelet has no disk
pressure

  ReadyTrueTue, 14 Aug 2018 09:31:20 -0400   Sat, 11
Aug 2018 00:41:27 -0400   KubeletReady kubelet is posting
ready status. AppArmor enabled

Addresses:

  InternalIP:  *

  Hostname:**

Capacity:

 cpu: 16

 memory:  125827288Ki

 pods:110

Allocatable:

 cpu: 16

 memory:  125724888Ki

 pods:110

System Info:

 Machine ID: *

 System UUID:**

 Boot ID:1493028d-0a80-4f2f-b0f1-48d9b8910e9f

 Kernel Version: 4.4.0-1062-aws

 OS Image:   Ubuntu 16.04.4 LTS

 Operating System:   linux

 Architecture:   amd64

 Container Runtime Version:  docker://Unknown

 Kubelet Version:v1.8.3

 Kube-Proxy Version: v1.8.3

PodCIDR: **

ExternalID:  **

Non-terminated Pods: (11 in total)

  Namespace  Name
 CPU Requests  CPU Limits  Memory Requests  Memory
Limits

  -  
   --  ---
 -

  kube-systemcalico-node-gj5mb
  250m (1%) 0 (0%)  0 (0%)   0 (0%)

  kube-system
 kube-proxy- 100m (0%)
0 (0%)  0 (0%)   0 (0%)

  kube-systemprometheus-prometheus-node-exporter-9cntq
  100m (0%) 200m (1%)   30Mi (0%)50Mi (0%)

  logging
 elasticsearch-elasticsearch-data-69df997486-gqcwg   400m (2%)
1 (6%)  8Gi (6%) 16Gi (13%)

  loggingfluentd-fluentd-elasticsearch-tj7nd
  200m (1%) 0 (0%)  612Mi (0%)   0 (0%)

  rook   rook-agent-6jtzm
 0 (0%)0 (0%)  0 (0%)   0 (0%)

  rook
rook-ceph-osd-10-6-42-250.accel.aws-cardda.cb4good.com-gwb8j0 (0%)
   0 (0%)  0 (0%)   0 (0%)

  spark
 accelerate-test-5-a3bfb8a597e83d459193a183e17f13b5-exec-1   2 (12%)
0 (0%)  10Gi (8%)12Gi (10%)

  spark
 accelerate-testing-1-8ed0482f3bfb3c0a83da30bb7d433dff-exec-52 (12%)
0 (0%)  10Gi (8%)12Gi 

Re: Executor lost for unknown reasons error Spark 2.3 on kubernetes

2018-07-31 Thread purna pradeep
$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)

at java.lang.Thread.run(Thread.java:748)

2018-07-30 19:58:42 INFO  BlockManagerMasterEndpoint:54 - Trying to remove
executor 7 from BlockManagerMaster.

2018-07-30 19:58:42 WARN  BlockManagerMasterEndpoint:66 - No more replicas
available for rdd_9_37 !

MasterEndpoint:54 - Removing block manager BlockManagerId(7, 10.*.*.*.*,
43888, None)

2018-07-30 19:58:42 INFO  BlockManagerMaster:54 - Removed 7 successfully in
removeExecutor

2018-07-30 19:58:42 INFO  DAGScheduler:54 - Shuffle files lost for
executor: 7 (epoch 1)

2018-07-30 19:58:42 ERROR ContextCleaner:91 - Error cleaning broadcast 11

org.apache.spark.SparkException: Exception thrown in awaitResult:

at
org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)

at
org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)

at
org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:155)

at
org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:321)

at
org.apache.spark.broadcast.TorrentBroadcastFactory.unbroadcast(TorrentBroadcastFactory.scala:45)

at
org.apache.spark.broadcast.BroadcastManager.unbroadcast(BroadcastManager.scala:66)

at
org.apache.spark.ContextCleaner.doCleanupBroadcast(ContextCleaner.scala:238)

at
org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$1.apply(ContextCleaner.scala:194)

at
org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$1.apply(ContextCleaner.scala:185)

at scala.Option.foreach(Option.scala:257)

at
org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:185)

at
org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1319)

at org.apache.spark.ContextCleaner.org
<http://org.apache.spark.contextcleaner.org/>
$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:178)

at
org.apache.spark.ContextCleaner$$anon$1.run(ContextCleaner.scala:73)

Caused by: java.io.IOException: Connection reset by peer

at sun.nio.ch.FileDispatcherImpl.read0(Native Method)

at
sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)

at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)

at sun.nio.ch.IOUtil.read(IOUtil.java:192)

at
sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)

at
io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288)

at
io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1106)

at
io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:343)

at
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:123)

at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)

at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)

at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)

at
io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)

at
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)

at
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)

at java.lang.Thread.run(Thread.java:

On Tue, Jul 31, 2018 at 8:32 AM purna pradeep 
wrote:

>
> Hello,
>>
>>
>>
>> I’m getting below error in spark driver pod logs and executor pods are
>> getting killed midway through while the job is running  and even driver pod
>> Terminated with below intermittent error ,this happens if I run multiple
>> jobs in parallel.
>>
>>
>>
>> Not able to see executor logs as executor pods are killed
>>
>>
>>
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>> 23 in stage 36.0 failed 4 times, most recent failure: Lost task 23.3 in
>> stage 36.0 (TID 1006, 10.10.125.119, executor 1): ExecutorLostFailure
>> (executor 1 exited caused by one of the running tasks) Reason: Executor
>> lost for unknown reasons.
>>
>> Driver stacktrace:
>>
>> at org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
>>
>> at
>> org.apache.spark.scheduler

Executor lost for unknown reasons error Spark 2.3 on kubernetes

2018-07-31 Thread purna pradeep
> Hello,
>
>
>
> I’m getting below error in spark driver pod logs and executor pods are
> getting killed midway through while the job is running  and even driver pod
> Terminated with below intermittent error ,this happens if I run multiple
> jobs in parallel.
>
>
>
> Not able to see executor logs as executor pods are killed
>
>
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 23
> in stage 36.0 failed 4 times, most recent failure: Lost task 23.3 in stage
> 36.0 (TID 1006, 10.10.125.119, executor 1): ExecutorLostFailure (executor 1
> exited caused by one of the running tasks) Reason: Executor lost for
> unknown reasons.
>
> Driver stacktrace:
>
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
>
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
>
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
>
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
>
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
>
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
>
> at scala.Option.foreach(Option.scala:257)
>
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
>
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
>
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
>
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
>
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>
> at
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
>
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
>
> at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:194)
>
> ... 42 mor
>


Executor lost for unknown reasons error Spark 2.3 on kubernetes

2018-07-30 Thread purna pradeep
Hello,



I’m getting below error in spark driver pod logs and executor pods are
getting killed midway through while the job is running  and even driver pod
Terminated with below intermittent error ,this happens if I run multiple
jobs in parallel.



Not able to see executor logs as executor pods are killed



org.apache.spark.SparkException: Job aborted due to stage failure: Task 23
in stage 36.0 failed 4 times, most recent failure: Lost task 23.3 in stage
36.0 (TID 1006, 10.10.125.119, executor 1): ExecutorLostFailure (executor 1
exited caused by one of the running tasks) Reason: Executor lost for
unknown reasons.

Driver stacktrace:

at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)

at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)

at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)

at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)

at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)

at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)

at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)

at scala.Option.foreach(Option.scala:257)

at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)

at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)

at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)

at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)

at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

at
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)

at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)

at
org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:194)

... 42 mor


Executor lost for unknown reasons error Spark 2.3 on kubernetes

2018-07-30 Thread Mamillapalli, Purna Pradeep
Hello,

I’m getting below error in spark driver pod logs and executor pods are getting 
killed midway through while the job is running  and even driver pod Terminated 
with below intermittent error ,this happens if I run multiple jobs in parallel.

Not able to see executor logs as executor pods are killed

org.apache.spark.SparkException: Job aborted due to stage failure: Task 23 in 
stage 36.0 failed 4 times, most recent failure: Lost task 23.3 in stage 36.0 
(TID 1006, 10.10.125.119, executor 1): ExecutorLostFailure (executor 1 exited 
caused by one of the running tasks) Reason: Executor lost for unknown reasons.
Driver stacktrace:
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at scala.Option.foreach(Option.scala:257)
at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:194)
... 42 more


Thanks,
Purna


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Spark 2.3 Kubernetes error

2018-07-06 Thread purna pradeep
> Hello,
>
>
>
> When I’m trying to set below options to spark-submit command on k8s Master
> getting below error in spark-driver pod logs
>
>
>
> --conf spark.executor.extraJavaOptions=" -Dhttps.proxyHost=myhost
> -Dhttps.proxyPort=8099 -Dhttp.useproxy=true -Dhttps.protocols=TLSv1.2" \
>
> --conf spark.driver.extraJavaOptions="--Dhttps.proxyHost=myhost
> -Dhttps.proxyPort=8099 -Dhttp.useproxy=true -Dhttps.protocols=TLSv1.2" \
>
>
>
> But when I tried to set these extraJavaoptions as system.properties in the
> spark application jar everything works fine.
>
>
>
> 2018-06-11 21:26:28 ERROR SparkContext:91 - Error initializing
> SparkContext.
>
> org.apache.spark.SparkException: External scheduler cannot be instantiated
>
> at
> org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2747)
>
> at
> org.apache.spark.SparkContext.init(SparkContext.scala:492)
>
> at
> org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2486)
>
> at
> org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:930)
>
> at
> org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:921)
>
> at scala.Option.getOrElse(Option.scala:121)
>
> at
> org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:921)
>
> Caused by: io.fabric8.kubernetes.client.KubernetesClientException:
> Operation: [get]  for kind: [Pod]  with name:
> [test-657e2f715ada3f91ae32c588aa178f63-driver]  in namespace: [test]
> failed.
>
> at
> io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:62)
>
> at
> io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:71)
>
> at
> io.fabric8.kubernetes.client.dsl.base.BaseOperation.getMandatory(BaseOperation.java:228)
>
> at
> io.fabric8.kubernetes.client.dsl.base.BaseOperation.get(BaseOperation.java:184)
>
> at
> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterSchedulerBackend.init(KubernetesClusterSchedulerBackend.scala:70)
>
> at
> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager.createSchedulerBackend(KubernetesClusterManager.scala:120)
>
> at
> org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2741)
>
> ... 12 more
>
> Caused by: javax.net.ssl.SSLHandshakeException:
> sun.security.validator.ValidatorException: PKIX path building failed:
> sun.security.provider.certpath.SunCertPathBuilderException: unable to find
> valid certification path to requested target
>
> at
> sun.security.ssl.Alerts.getSSLException(Alerts.java:192)
>
> at
> sun.security.ssl.SSLSocketImpl.fatal(SSLSocketImpl.java:1959)
>
> at
> sun.security.ssl.Handshaker.fatalSE(Handshaker.java:302)
>
> at
> sun.security.ssl.Handshaker.fatalSE(Handshaker.java:296)
>
> at
> sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1514)
>
> at
> sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:216)
>
> at
> sun.security.ssl.Handshaker.processLoop(Handshaker.java:1026)
>
> at
> sun.security.ssl.Handshaker.process_record(Handshaker.java:961)
>
> at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:1072)
>
> at
> sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1385)
>
> at
> sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1413)
>
> at
> sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1397)
>
> at
> okhttp3.internal.connection.RealConnection.connectTls(RealConnection.java:281)
>
> at
> okhttp3.internal.connection.RealConnection.establishProtocol(RealConnection.java:251)
>
> at
> okhttp3.internal.connection.RealConnection.connect(RealConnection.java:151)
>
> at
> okhttp3.internal.connection.StreamAllocation.findConnection(StreamAllocation.java:195)
>
> at
> okhttp3.internal.connection.StreamAllocation.findHealthyConnection(StreamAllocation.java:121)
>
> at
> okhttp3.internal.connection.StreamAllocation.newStream(StreamAllocation.java:100)
>
> at
> okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:42)
>
> at
> okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)
>
> at
> okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)
>
> at
> okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)
>
> at
> 

Spark 2.3 Kubernetes error

2018-07-05 Thread purna pradeep
Hello,



When I’m trying to set below options to spark-submit command on k8s Master
getting below error in spark-driver pod logs



--conf spark.executor.extraJavaOptions=" -Dhttps.proxyHost=myhost
-Dhttps.proxyPort=8099 -Dhttp.useproxy=true -Dhttps.protocols=TLSv1.2" \

--conf spark.driver.extraJavaOptions="--Dhttps.proxyHost=myhost
-Dhttps.proxyPort=8099 -Dhttp.useproxy=true -Dhttps.protocols=TLSv1.2" \



But when I tried to set these extraJavaoptions as system.properties in the
spark application jar everything works fine.



2018-06-11 21:26:28 ERROR SparkContext:91 - Error initializing SparkContext.

org.apache.spark.SparkException: External scheduler cannot be instantiated

at
org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2747)

at
org.apache.spark.SparkContext.init(SparkContext.scala:492)

at
org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2486)

at
org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:930)

at
org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:921)

at scala.Option.getOrElse(Option.scala:121)

at
org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:921)

Caused by: io.fabric8.kubernetes.client.KubernetesClientException:
Operation: [get]  for kind: [Pod]  with name:
[test-657e2f715ada3f91ae32c588aa178f63-driver]  in namespace: [test]
failed.

at
io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:62)

at
io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:71)

at
io.fabric8.kubernetes.client.dsl.base.BaseOperation.getMandatory(BaseOperation.java:228)

at
io.fabric8.kubernetes.client.dsl.base.BaseOperation.get(BaseOperation.java:184)

at
org.apache.spark.scheduler.cluster.k8s.KubernetesClusterSchedulerBackend.init(KubernetesClusterSchedulerBackend.scala:70)

at
org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager.createSchedulerBackend(KubernetesClusterManager.scala:120)

at
org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2741)

... 12 more

Caused by: javax.net.ssl.SSLHandshakeException:
sun.security.validator.ValidatorException: PKIX path building failed:
sun.security.provider.certpath.SunCertPathBuilderException: unable to find
valid certification path to requested target

at sun.security.ssl.Alerts.getSSLException(Alerts.java:192)

at
sun.security.ssl.SSLSocketImpl.fatal(SSLSocketImpl.java:1959)

at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:302)

at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:296)

at
sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1514)

at
sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:216)

at
sun.security.ssl.Handshaker.processLoop(Handshaker.java:1026)

at
sun.security.ssl.Handshaker.process_record(Handshaker.java:961)

at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:1072)

at
sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1385)

at
sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1413)

at
sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1397)

at
okhttp3.internal.connection.RealConnection.connectTls(RealConnection.java:281)

at
okhttp3.internal.connection.RealConnection.establishProtocol(RealConnection.java:251)

at
okhttp3.internal.connection.RealConnection.connect(RealConnection.java:151)

at
okhttp3.internal.connection.StreamAllocation.findConnection(StreamAllocation.java:195)

at
okhttp3.internal.connection.StreamAllocation.findHealthyConnection(StreamAllocation.java:121)

at
okhttp3.internal.connection.StreamAllocation.newStream(StreamAllocation.java:100)

at
okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:42)

at
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)

at
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)

at
okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)

at
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)

at
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)

at

Spark 2.3 Kubernetes error

2018-07-05 Thread Mamillapalli, Purna Pradeep
Hello,

When I’m trying to set below options to spark-submit command on k8s Master 
getting below error in spark-driver pod logs



--conf spark.executor.extraJavaOptions=" -Dhttps.proxyHost=myhost 
-Dhttps.proxyPort=8099 -Dhttp.useproxy=true -Dhttps.protocols=TLSv1.2" \

--conf spark.driver.extraJavaOptions="--Dhttps.proxyHost=myhost 
-Dhttps.proxyPort=8099 -Dhttp.useproxy=true -Dhttps.protocols=TLSv1.2" \


But when I tried to set these extraJavaoptions as system.properties in the 
spark application jar everything works fine.


2018-06-11 21:26:28 ERROR SparkContext:91 - Error initializing SparkContext.

org.apache.spark.SparkException: External scheduler cannot be instantiated

at 
org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2747)

at 
org.apache.spark.SparkContext.init(SparkContext.scala:492)

at 
org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2486)

at 
org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:930)

at 
org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:921)

at scala.Option.getOrElse(Option.scala:121)

at 
org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:921)

Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Operation: 
[get]  for kind: [Pod]  with name: 
[test-657e2f715ada3f91ae32c588aa178f63-driver]  in namespace: [test]  failed.

at 
io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:62)

at 
io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:71)

at 
io.fabric8.kubernetes.client.dsl.base.BaseOperation.getMandatory(BaseOperation.java:228)

at 
io.fabric8.kubernetes.client.dsl.base.BaseOperation.get(BaseOperation.java:184)

at 
org.apache.spark.scheduler.cluster.k8s.KubernetesClusterSchedulerBackend.init(KubernetesClusterSchedulerBackend.scala:70)

at 
org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager.createSchedulerBackend(KubernetesClusterManager.scala:120)

at 
org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2741)

... 12 more

Caused by: javax.net.ssl.SSLHandshakeException: 
sun.security.validator.ValidatorException: PKIX path building failed: 
sun.security.provider.certpath.SunCertPathBuilderException: unable to find 
valid certification path to requested target

at sun.security.ssl.Alerts.getSSLException(Alerts.java:192)

at sun.security.ssl.SSLSocketImpl.fatal(SSLSocketImpl.java:1959)

at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:302)

at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:296)

at 
sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1514)

at 
sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:216)

at sun.security.ssl.Handshaker.processLoop(Handshaker.java:1026)

at 
sun.security.ssl.Handshaker.process_record(Handshaker.java:961)

at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:1072)

at 
sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1385)

at 
sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1413)

at 
sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1397)

at 
okhttp3.internal.connection.RealConnection.connectTls(RealConnection.java:281)

at 
okhttp3.internal.connection.RealConnection.establishProtocol(RealConnection.java:251)

at 
okhttp3.internal.connection.RealConnection.connect(RealConnection.java:151)

at 
okhttp3.internal.connection.StreamAllocation.findConnection(StreamAllocation.java:195)

at 
okhttp3.internal.connection.StreamAllocation.findHealthyConnection(StreamAllocation.java:121)

at 
okhttp3.internal.connection.StreamAllocation.newStream(StreamAllocation.java:100)

at 
okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:42)

at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)

at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)

at 
okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)

at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)

at 
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)

at 

Spark 2.3 driver pod stuck in Running state — Kubernetes

2018-06-08 Thread purna pradeep
Hello,

When I run spark-submit on k8s cluster I’m

Seeing driver pod stuck in Running state and when I pulled driver pod logs
I’m able to see below log

I do understand that this warning might be because of lack of cpu/ Memory ,
but I expect driver pod be in “Pending” state rather than “ Running” state
though actually it’s not Running

So I had kill the driver pod and resubmit the job

Please suggest here !

2018-06-08 14:38:01 WARN TaskSchedulerImpl:66 - Initial job has not
accepted any resources; check your cluster UI to ensure that workers are
registered and have sufficient resources

2018-06-08 14:38:16 WARN TaskSchedulerImpl:66 - Initial job has not
accepted any resources; check your cluster UI to ensure that workers are
registered and have sufficient resources

2018-06-08 14:38:31 WARN TaskSchedulerImpl:66 - Initial job has not
accepted any resources; check your cluster UI to ensure that workers are
registered and have sufficient resources

2018-06-08 14:38:46 WARN TaskSchedulerImpl:66 - Initial job has not
accepted any resources; check your cluster UI to ensure that workers are
registered and have sufficient resources

2018-06-08 14:39:01 WARN TaskSchedulerImpl:66 - Initial job has not
accepted any resources; check your cluster UI to ensure that workers are
registered and have sufficient resources


spark partitionBy with partitioned column in json output

2018-06-04 Thread purna pradeep
im reading below json in spark

{"bucket": "B01", "actionType": "A1", "preaction": "NULL",
"postaction": "NULL"}
{"bucket": "B02", "actionType": "A2", "preaction": "NULL",
"postaction": "NULL"}
{"bucket": "B03", "actionType": "A3", "preaction": "NULL",
"postaction": "NULL"}

val df=spark.read.json("actions.json").toDF()

Now im writing the same to a json output as below

df.write. format("json"). mode("append").
partitionBy("bucket","actionType"). save("output.json")


and the output.json is as below

{"preaction":"NULL","postaction":"NULL"}

bucket,actionType columns are missing in the json output, i need
partitionby columns as well in the output


Re: Spark 2.3 error on Kubernetes

2018-05-29 Thread purna pradeep
Abirudh,

Thanks for your response

I’m running k8s cluster on AWS and kub-dns pods are running fine and also
as I mentioned only 1 executor pod is running though I requested for 5 and
rest 4 were killed with below error and I do have enough resources
available.

On Tue, May 29, 2018 at 6:28 PM Anirudh Ramanathan 
wrote:

> This looks to me like a kube-dns error that's causing the driver DNS
> address to not resolve.
> It would be worth double checking that kube-dns is indeed running (in the
> kube-system namespace).
> Often, with environments like minikube, kube-dns may exit/crashloop due to
> lack of resource.
>
> On Tue, May 29, 2018 at 3:18 PM, purna pradeep 
> wrote:
>
>> Hello,
>>
>> I’m getting below  error when I spark-submit a Spark 2.3 app on
>> Kubernetes *v1.8.3* , some of the executor pods  were killed with below
>> error as soon as they come up
>>
>> Exception in thread "main" java.lang.reflect.UndeclaredThrowableException
>>
>> at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1713)
>>
>> at
>> org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:64)
>>
>> at
>> org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:188)
>>
>> at
>> org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:293)
>>
>> at
>> org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
>>
>> Caused by: org.apache.spark.SparkException: Exception thrown in
>> awaitResult:
>>
>> at
>> org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
>>
>> at
>> org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
>>
>> at
>> org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:101)
>>
>> at
>> org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:201)
>>
>> at
>> org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:65)
>>
>> at
>> org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:64)
>>
>> at java.security.AccessController.doPrivileged(Native
>> Method)
>>
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>>
>> at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
>>
>> ... 4 more
>>
>> Caused by: java.io.IOException: Failed to connect to
>> spark-1527629824987-driver-svc.spark.svc:7078
>>
>> at
>> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:245)
>>
>> at
>> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:187)
>>
>> at
>> org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:198)
>>
>> at
>> org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:194)
>>
>> at
>> org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:190)
>>
>> at
>> java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>
>> at java.lang.Thread.run(Thread.java:748)
>>
>> Caused by: java.net.UnknownHostException:
>> spark-1527629824987-driver-svc.spark.svc
>>
>> at
>> java.net.InetAddress.getAllByName0(InetAddress.java:1280)
>>
>> at
>> java.net.InetAddress.getAllByName(InetAddress.java:1192)
>>
>> at
>> java.net.InetAddress.getAllByName(InetAddress.java:1126)
>>
>> at java.net.InetAddress.getByName(InetAddress.java:1076)
>>
>> at
>> io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:146)
>>
>> at
>> io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:143)
>>
>> at java.security.AccessController.doPri

Spark 2.3 error on Kubernetes

2018-05-29 Thread purna pradeep
Hello,

I’m getting below  error when I spark-submit a Spark 2.3 app on Kubernetes
*v1.8.3* , some of the executor pods  were killed with below error as soon
as they come up

Exception in thread "main" java.lang.reflect.UndeclaredThrowableException

at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1713)

at
org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:64)

at
org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:188)

at
org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:293)

at
org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)

Caused by: org.apache.spark.SparkException: Exception thrown in awaitResult:

at
org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)

at
org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)

at
org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:101)

at
org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:201)

at
org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:65)

at
org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:64)

at java.security.AccessController.doPrivileged(Native
Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)

... 4 more

Caused by: java.io.IOException: Failed to connect to
spark-1527629824987-driver-svc.spark.svc:7078

at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:245)

at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:187)

at
org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:198)

at
org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:194)

at
org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:190)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)

Caused by: java.net.UnknownHostException:
spark-1527629824987-driver-svc.spark.svc

at java.net.InetAddress.getAllByName0(InetAddress.java:1280)

at java.net.InetAddress.getAllByName(InetAddress.java:1192)

at java.net.InetAddress.getAllByName(InetAddress.java:1126)

at java.net.InetAddress.getByName(InetAddress.java:1076)

at
io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:146)

at
io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:143)

at java.security.AccessController.doPrivileged(Native
Method)

at
io.netty.util.internal.SocketUtils.addressByName(SocketUtils.java:143)

at
io.netty.resolver.DefaultNameResolver.doResolve(DefaultNameResolver.java:43)

at
io.netty.resolver.SimpleNameResolver.resolve(SimpleNameResolver.java:63)

at
io.netty.resolver.SimpleNameResolver.resolve(SimpleNameResolver.java:55)

at
io.netty.resolver.InetSocketAddressResolver.doResolve(InetSocketAddressResolver.java:57)

at
io.netty.resolver.InetSocketAddressResolver.doResolve(InetSocketAddressResolver.java:32)

at
io.netty.resolver.AbstractAddressResolver.resolve(AbstractAddressResolver.java:108)

at
io.netty.bootstrap.Bootstrap.doResolveAndConnect0(Bootstrap.java:208)

at
io.netty.bootstrap.Bootstrap.access$000(Bootstrap.java:49)

at
io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:188)

at
io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:174)

at
io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:507)

at
io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:481)

at
io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:420)

at
io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104)

at
io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:82)

at

Spark 2.3 error on kubernetes

2018-05-29 Thread Mamillapalli, Purna Pradeep
Hello,


I’m getting below intermittent error when I spark-submit a Spark 2.3 app on 
Kubernetes v1.8.3 , some of the executor pods  were killed with below error as 
soon as they come up


Exception in thread "main" java.lang.reflect.UndeclaredThrowableException
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1713)
at 
org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:64)
at 
org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:188)
at 
org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:293)
at 
org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
Caused by: org.apache.spark.SparkException: Exception thrown in awaitResult:
at 
org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
at 
org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
at 
org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:101)
at 
org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:201)
at 
org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:65)
at 
org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:64)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
... 4 more
Caused by: java.io.IOException: Failed to connect to 
spark-1527629824987-driver-svc.spark.svc:7078
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:245)
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:187)
at 
org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:198)
at 
org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:194)
at 
org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:190)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.UnknownHostException: 
spark-1527629824987-driver-svc.spark.svc
at java.net.InetAddress.getAllByName0(InetAddress.java:1280)
at java.net.InetAddress.getAllByName(InetAddress.java:1192)
at java.net.InetAddress.getAllByName(InetAddress.java:1126)
at java.net.InetAddress.getByName(InetAddress.java:1076)
at 
io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:146)
at 
io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:143)
at java.security.AccessController.doPrivileged(Native Method)
at 
io.netty.util.internal.SocketUtils.addressByName(SocketUtils.java:143)
at 
io.netty.resolver.DefaultNameResolver.doResolve(DefaultNameResolver.java:43)
at 
io.netty.resolver.SimpleNameResolver.resolve(SimpleNameResolver.java:63)
at 
io.netty.resolver.SimpleNameResolver.resolve(SimpleNameResolver.java:55)
at 
io.netty.resolver.InetSocketAddressResolver.doResolve(InetSocketAddressResolver.java:57)
at 
io.netty.resolver.InetSocketAddressResolver.doResolve(InetSocketAddressResolver.java:32)
at 
io.netty.resolver.AbstractAddressResolver.resolve(AbstractAddressResolver.java:108)
at 
io.netty.bootstrap.Bootstrap.doResolveAndConnect0(Bootstrap.java:208)
at io.netty.bootstrap.Bootstrap.access$000(Bootstrap.java:49)
at 
io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:188)
at 
io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:174)
at 
io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:507)
at 
io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:481)
at 
io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:420)
at 
io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104)
at 
io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:82)
at 

Spark 2.3 error on kubernetes

2018-05-29 Thread Mamillapalli, Purna Pradeep
Hello,


I’m getting below intermittent error when I spark-submit a Spark 2.3 app on 
Kubernetes v1.8.3 , some of the executor pods  were killed with below error as 
soon as they come up


Exception in thread "main" java.lang.reflect.UndeclaredThrowableException
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1713)
at 
org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:64)
at 
org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:188)
at 
org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:293)
at 
org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
Caused by: org.apache.spark.SparkException: Exception thrown in awaitResult:
at 
org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
at 
org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
at 
org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:101)
at 
org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:201)
at 
org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:65)
at 
org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:64)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
... 4 more
Caused by: java.io.IOException: Failed to connect to 
spark-1527629824987-driver-svc.spark.svc:7078
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:245)
at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:187)
at 
org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:198)
at 
org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:194)
at 
org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:190)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.UnknownHostException: 
spark-1527629824987-driver-svc.spark.svc
at java.net.InetAddress.getAllByName0(InetAddress.java:1280)
at java.net.InetAddress.getAllByName(InetAddress.java:1192)
at java.net.InetAddress.getAllByName(InetAddress.java:1126)
at java.net.InetAddress.getByName(InetAddress.java:1076)
at 
io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:146)
at 
io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:143)
at java.security.AccessController.doPrivileged(Native Method)
at 
io.netty.util.internal.SocketUtils.addressByName(SocketUtils.java:143)
at 
io.netty.resolver.DefaultNameResolver.doResolve(DefaultNameResolver.java:43)
at 
io.netty.resolver.SimpleNameResolver.resolve(SimpleNameResolver.java:63)
at 
io.netty.resolver.SimpleNameResolver.resolve(SimpleNameResolver.java:55)
at 
io.netty.resolver.InetSocketAddressResolver.doResolve(InetSocketAddressResolver.java:57)
at 
io.netty.resolver.InetSocketAddressResolver.doResolve(InetSocketAddressResolver.java:32)
at 
io.netty.resolver.AbstractAddressResolver.resolve(AbstractAddressResolver.java:108)
at 
io.netty.bootstrap.Bootstrap.doResolveAndConnect0(Bootstrap.java:208)
at io.netty.bootstrap.Bootstrap.access$000(Bootstrap.java:49)
at 
io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:188)
at 
io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:174)
at 
io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:507)
at 
io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:481)
at 
io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:420)
at 
io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104)
at 
io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:82)
at 

Spark driver pod garbage collection

2018-05-23 Thread purna pradeep
Hello,

Currently I observe dead pods are not getting garbage collected (aka spark
driver pods which have completed execution). So pods could sit in the
namespace for weeks potentially. This makes listing, parsing, and reading
pods slower and well as having junk sit on the cluster.

I believe minimum-container-ttl-duration kubelet flag is by default set to
0 minute but I don’t see the completed spark driver pods are garbage
collected

Do I need to set any flag explicitly @ kubelet level?


Spark driver pod eviction Kubernetes

2018-05-22 Thread purna pradeep
Hi,

What would be the recommended approach to wait for spark driver pod to
complete the currently running job before it gets evicted to new nodes
while maintenance on the current node is goingon (kernel upgrade,hardware
maintenance etc..) using drain command

I don’t think I can use PoDisruptionBudget as Spark pods deployment yaml(s)
is taken by Kubernetes

Please suggest !


Oozie with spark 2.3 in Kubernetes

2018-05-11 Thread purna pradeep
Hello,

Would like to know if anyone tried oozie with spark 2.3 actions on
Kubernetes for scheduling spark jobs .


Thanks,
Purna


Re: Scala program to spark-submit on k8 cluster

2018-04-04 Thread purna pradeep
yes “REST application that submits a Spark job to a k8s cluster by running
spark-submit programmatically” and also would like to expose as a
 Kubernetes service so that clients can access as any other Rest api

On Wed, Apr 4, 2018 at 12:25 PM Yinan Li  wrote:

> Hi Kittu,
>
> What do you mean by "a Scala program"? Do you mean a program that submits
> a Spark job to a k8s cluster by running spark-submit programmatically, or
> some example Scala application that is to run on the cluster?
>
> On Wed, Apr 4, 2018 at 4:45 AM, Kittu M  wrote:
>
>> Hi,
>>
>> I’m looking for a Scala program to spark submit a Scala application
>> (spark 2.3 job) on k8 cluster .
>>
>> Any help  would be much appreciated. Thanks
>>
>>
>>
>


unsubscribe

2018-04-02 Thread purna pradeep
unsubscribe


unsubscribe

2018-03-28 Thread purna pradeep


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Unsubscribe

2018-03-28 Thread purna pradeep


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Rest API for Spark2.3 submit on kubernetes(version 1.8.*) cluster

2018-03-21 Thread purna pradeep
Thanks Yinan,

Looks like this is stil in alpha version.

Would like to know if there is any rest-interface for spark2.3 job
submission similar to spark 2.2 as I need to submit spark applications to
k8 master based on different events (cron or s3 file based trigger)

On Tue, Mar 20, 2018 at 11:50 PM Yinan Li <liyinan...@gmail.com> wrote:

> One option is the Spark Operator
> <https://github.com/GoogleCloudPlatform/spark-on-k8s-operator>. It allows
> specifying and running Spark applications on Kubernetes using Kubernetes
> custom resources objects. It takes SparkApplication CRD objects and
> automatically submits the applications to run on a Kubernetes cluster.
>
> Yinan
>
> On Tue, Mar 20, 2018 at 7:47 PM, purna pradeep <purna2prad...@gmail.com>
> wrote:
>
>> Im using kubernetes cluster on AWS to run spark jobs ,im using spark 2.3
>> ,now i want to run spark-submit from AWS lambda function to k8s
>> master,would like to know if there is any REST interface to run Spark
>> submit on k8s Master
>
>
>


Rest API for Spark2.3 submit on kubernetes(version 1.8.*) cluster

2018-03-20 Thread purna pradeep
Im using kubernetes cluster on AWS to run spark jobs ,im using spark 2.3
,now i want to run spark-submit from AWS lambda function to k8s
master,would like to know if there is any REST interface to run Spark
submit on k8s Master


Re: Spark 2.3 submit on Kubernetes error

2018-03-12 Thread purna pradeep
Thanks Yinan,

I’m able to get kube-dns endpoints when I ran this command

kubectl get ep kube-dns —namespace=kube-system

Do I need to deploy under kube-system instead of default namespace

And please lemme know if you have any insights on Error1 ?

On Sun, Mar 11, 2018 at 8:26 PM Yinan Li <liyinan...@gmail.com> wrote:

> Spark on Kubernetes requires the presence of the kube-dns add-on properly
> configured. The executors connect to the driver through a headless
> Kubernetes service using the DNS name of the service. Can you check if you
> have the add-on installed in your cluster? This issue
> https://github.com/apache-spark-on-k8s/spark/issues/558 might help.
>
>
> On Sun, Mar 11, 2018 at 5:01 PM, purna pradeep <purna2prad...@gmail.com>
> wrote:
>
>> Getting below errors when I’m trying to run spark-submit on k8 cluster
>>
>>
>> *Error 1*:This looks like a warning it doesn’t interrupt the app running
>> inside executor pod but keeps on getting this warning
>>
>>
>> *2018-03-09 11:15:21 WARN  WatchConnectionManager:192 - Exec Failure*
>> *java.io.EOFException*
>> *   at
>> okio.RealBufferedSource.require(RealBufferedSource.java:60)*
>> *   at
>> okio.RealBufferedSource.readByte(RealBufferedSource.java:73)*
>> *   at okhttp3.internal.ws
>> <http://okhttp3.internal.ws>.WebSocketReader.readHeader(WebSocketReader.java:113)*
>> *   at okhttp3.internal.ws
>> <http://okhttp3.internal.ws>.WebSocketReader.processNextFrame(WebSocketReader.java:97)*
>> *   at okhttp3.internal.ws
>> <http://okhttp3.internal.ws>.RealWebSocket.loopReader(RealWebSocket.java:262)*
>> *   at okhttp3.internal.ws
>> <http://okhttp3.internal.ws>.RealWebSocket$2.onResponse(RealWebSocket.java:201)*
>> *   at okhttp3.RealCall$AsyncCall.execute(RealCall.java:141)*
>> *   at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)*
>> *   at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)*
>> *   at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)*
>> *   at java.lang.Thread.run(Thread.java:748)*
>>
>>
>>
>> *Error2:* This is intermittent error  which is failing the executor pod
>> to run
>>
>>
>> *org.apache.spark.SparkException: External scheduler cannot be
>> instantiated*
>> * at
>> org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2747)*
>> * at org.apache.spark.SparkContext.(SparkContext.scala:492)*
>> * at
>> org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2486)*
>> * at
>> org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:930)*
>> * at
>> org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:921)*
>> * at scala.Option.getOrElse(Option.scala:121)*
>> * at
>> org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:921)*
>> * at
>> com.capitalone.quantum.spark.core.QuantumSession$.initialize(QuantumSession.scala:62)*
>> * at
>> com.capitalone.quantum.spark.core.QuantumSession$.getSparkSession(QuantumSession.scala:80)*
>> * at
>> com.capitalone.quantum.workflow.WorkflowApp$.getSession(WorkflowApp.scala:116)*
>> * at
>> com.capitalone.quantum.workflow.WorkflowApp$.main(WorkflowApp.scala:90)*
>> * at
>> com.capitalone.quantum.workflow.WorkflowApp.main(WorkflowApp.scala)*
>> *Caused by: io.fabric8.kubernetes.client.KubernetesClientException:
>> Operation: [get]  for kind: [Pod]  with name:
>> [myapp-ef79db3d9f4831bf85bda14145fdf113-driver-driver]  in namespace:
>> [default]  failed.*
>> * at
>> io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:62)*
>> * at
>> io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:71)*
>> * at
>> io.fabric8.kubernetes.client.dsl.base.BaseOperation.getMandatory(BaseOperation.java:228)*
>> * at
>> io.fabric8.kubernetes.client.dsl.base.BaseOperation.get(BaseOperation.java:184)*
>> * at
>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterSchedulerBackend.(KubernetesClusterSchedulerBackend.scala:70)*
>> * at
>> org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager.createSchedulerBackend(KubernetesClusterManager.scala:120)*
>> * at
>> org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$cr

Spark 2.3 submit on Kubernetes error

2018-03-11 Thread purna pradeep
Getting below errors when I’m trying to run spark-submit on k8 cluster


*Error 1*:This looks like a warning it doesn’t interrupt the app running
inside executor pod but keeps on getting this warning


*2018-03-09 11:15:21 WARN  WatchConnectionManager:192 - Exec Failure*
*java.io.EOFException*
*   at okio.RealBufferedSource.require(RealBufferedSource.java:60)*
*   at okio.RealBufferedSource.readByte(RealBufferedSource.java:73)*
*   at
okhttp3.internal.ws.WebSocketReader.readHeader(WebSocketReader.java:113)*
*   at
okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:97)*
*   at
okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:262)*
*   at
okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:201)*
*   at okhttp3.RealCall$AsyncCall.execute(RealCall.java:141)*
*   at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)*
*   at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)*
*   at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)*
*   at java.lang.Thread.run(Thread.java:748)*



*Error2:* This is intermittent error  which is failing the executor pod to
run


*org.apache.spark.SparkException: External scheduler cannot be
instantiated*
* at
org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2747)*
* at org.apache.spark.SparkContext.(SparkContext.scala:492)*
* at
org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2486)*
* at
org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:930)*
* at
org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:921)*
* at scala.Option.getOrElse(Option.scala:121)*
* at
org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:921)*
* at
com.capitalone.quantum.spark.core.QuantumSession$.initialize(QuantumSession.scala:62)*
* at
com.capitalone.quantum.spark.core.QuantumSession$.getSparkSession(QuantumSession.scala:80)*
* at
com.capitalone.quantum.workflow.WorkflowApp$.getSession(WorkflowApp.scala:116)*
* at
com.capitalone.quantum.workflow.WorkflowApp$.main(WorkflowApp.scala:90)*
* at
com.capitalone.quantum.workflow.WorkflowApp.main(WorkflowApp.scala)*
*Caused by: io.fabric8.kubernetes.client.KubernetesClientException:
Operation: [get]  for kind: [Pod]  with name:
[myapp-ef79db3d9f4831bf85bda14145fdf113-driver-driver]  in namespace:
[default]  failed.*
* at
io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:62)*
* at
io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:71)*
* at
io.fabric8.kubernetes.client.dsl.base.BaseOperation.getMandatory(BaseOperation.java:228)*
* at
io.fabric8.kubernetes.client.dsl.base.BaseOperation.get(BaseOperation.java:184)*
* at
org.apache.spark.scheduler.cluster.k8s.KubernetesClusterSchedulerBackend.(KubernetesClusterSchedulerBackend.scala:70)*
* at
org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager.createSchedulerBackend(KubernetesClusterManager.scala:120)*
* at
org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2741)*
* ... 11 more*
*Caused by: java.net.UnknownHostException: kubernetes.default.svc: Try
again*
* at java.net.Inet4AddressImpl.lookupAllHostAddr(Native Method)*
* at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:928)*
* at
java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1323)*
* at java.net.InetAddress.getAllByName0(InetAddress.java:1276)*
* at java.net.InetAddress.getAllByName(InetAddress.java:1192)*
* at java.net.InetAddress.getAllByName(InetAddress.java:1126)*
* at okhttp3.Dns$1.lookup(Dns.java:39)*
* at
okhttp3.internal.connection.RouteSelector.resetNextInetSocketAddress(RouteSelector.java:171)*
* at
okhttp3.internal.connection.RouteSelector.nextProxy(RouteSelector.java:137)*
* at
okhttp3.internal.connection.RouteSelector.next(RouteSelector.java:82)*
* at
okhttp3.internal.connection.StreamAllocation.findConnection(StreamAllocation.java:171)*
* at
okhttp3.internal.connection.StreamAllocation.findHealthyConnection(StreamAllocation.java:121)*
* at
okhttp3.internal.connection.StreamAllocation.newStream(StreamAllocation.java:100)*
* at
okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:42)*
* at
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)*
* at
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67)*
* at
okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)*
* at
okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92)*
* 

handling Remote dependencies for spark-submit in spark 2.3 with kubernetes

2018-03-08 Thread purna pradeep
Im trying to run spark-submit to kubernetes cluster with spark 2.3 docker
container image

The challenge im facing is application have a mainapplication.jar and other
dependency files & jars which are located in Remote location like AWS s3
,but as per spark 2.3 documentation there is something called kubernetes
init-container to download remote dependencies but in this case im not
creating any Podspec to include init-containers in kubernetes, as per
documentation Spark 2.3 spark/kubernetes internally creates Pods
(driver,executor) So not sure how can i use init-container for spark-submit
when there are remote dependencies.

https://spark.apache.org/docs/latest/running-on-kubernetes.html#using-remote-dependencies

Please suggest


Unsubscribe

2018-02-27 Thread purna pradeep


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Unsubscribe

2018-02-26 Thread purna pradeep


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Unsubscribe

2018-02-26 Thread purna pradeep


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Unsubscribe

2018-02-11 Thread purna pradeep
Unsubscribe


Executor not getting added SparkUI & Spark Eventlog in deploymode:cluster

2017-11-14 Thread Mamillapalli, Purna Pradeep
Hi all,

Im performing spark submit using Spark rest api POST operation on 6066 port 
with below config

> Launch Command:
> "/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.141-1.b16.el7_3.x86_64/jre/bin/java"
> "-cp" "/usr/local/spark/conf/:/usr/local/spark/jars/*" "-Xmx4096M"
> "-Dspark.eventLog.enabled=true"
> "-Dspark.app.name=WorkflowApp"
> "-Dspark.submit.deployMode=cluster"
> "-Dspark.local.dir=/data0,/data1,/data2,/data3"
> "-Dspark.executor.cores=2" "-Dspark.master=spark://:7077"
> "-Dspark.serializer=org.apache.spark.serializer.KryoSerializer"
> "-Dspark.jars=s3a://<***>.jar" "-Dspark.driver.supervise=false"
> "-Dspark.history.fs.logDirectory=s3a://<*>/"
> "-Dspark.hadoop.fs.s3a.server-side-encryption-algorithm=AES256"
> "-Dspark.driver.memory=4G" "-Dspark.executor.memory=4G"
> "-Dspark.eventLog.dir=s3a://<*>/"
> "org.apache.spark.deploy.worker.DriverWrapper" "spark://Worker@<***>"
> "/usr/local/spark/work/driver-<***>.jar" "MyApp" "-c" "s3a://<***>"


when i looked into Spark eventlog below is what i observed

{"Event":"SparkListenerExecutorAdded","Timestamp":1510633498623,"Executor 
ID":"driver","Executor Info":{"Host":"localhost","Total Cores":2,"Log Urls":{}}}
"spark.master":"local[*]"


Though i ran in deployMode as cluster  the slave ip is not shown in Host 
section & spark.master is shown as local[*] above ,because of this the job is 
running only on driver and therefore when job is submitted its not showing up 
in http://:8080  under Running and Completed applications and it 
shows only under Running Drivers & Completed Drivers. Please suggest



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Spark http: Not showing completed apps

2017-11-08 Thread purna pradeep
Hi,

I'm using spark  standalone in aws ec2 .And I'm using spark rest
API http::8080/Json to get completed apps but the Json completed
apps as empty array though the job ran successfully.


Bulk load to HBase

2017-10-22 Thread Pradeep
We are on Hortonworks 2.5 and very soon upgrading to 2.6. Spark version 1.6.2.

We have large volume of data that we bulk load to HBase using import tsv. Map 
Reduce job is very slow and looking for options we can use spark to improve 
performance. Please let me know if this can be optimized with spark and what 
packages or libs can be used.

PM

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Select entire row based on a logic applied on 2 columns across multiple rows

2017-08-30 Thread purna pradeep
@Andres I need latest but it should less than 10 months based income_age
column and don't want to use sql here

On Wed, Aug 30, 2017 at 8:08 AM Andrés Ivaldi <iaiva...@gmail.com> wrote:

> Hi, if you need the last value from income in window function you can use
> last_value.
> No tested but meaby with @ayan sql
>
> spark.sql("select *, row_number(), last_value(income) over (partition by
> id order by income_age_ts desc) r from t")
>
>
> On Tue, Aug 29, 2017 at 11:30 PM, purna pradeep <purna2prad...@gmail.com>
> wrote:
>
>> @ayan,
>>
>> Thanks for your response
>>
>> I would like to have functions in this case  calculateIncome and the
>> reason why I need function is to reuse in other parts of the application
>> ..that's the reason I'm planning for mapgroups with function as argument
>> which takes rowiterator ..but not sure if this is the best to implement as
>> my initial dataframe is very large
>>
>> On Tue, Aug 29, 2017 at 10:24 PM ayan guha <guha.a...@gmail.com> wrote:
>>
>>> Hi
>>>
>>> the tool you are looking for is window function.  Example:
>>>
>>> >>> df.show()
>>> +++---+--+-+
>>> |JoinDate|dept| id|income|income_age_ts|
>>> +++---+--+-+
>>> | 4/20/13|  ES|101| 19000|  4/20/17|
>>> | 4/20/13|  OS|101| 1|  10/3/15|
>>> | 4/20/12|  DS|102| 13000|   5/9/17|
>>> | 4/20/12|  CS|102| 12000|   5/8/17|
>>> | 4/20/10|  EQ|103| 1|   5/9/17|
>>> | 4/20/10|  MD|103|  9000|   5/8/17|
>>> +++---+--+-+
>>>
>>> >>> res = spark.sql("select *, row_number() over (partition by id order
>>> by income_age_ts desc) r from t")
>>> >>> res.show()
>>> +++---+--+-+---+
>>> |JoinDate|dept| id|income|income_age_ts|  r|
>>> +++---+--+-+---+
>>> | 4/20/10|  EQ|103| 1|   5/9/17|  1|
>>> | 4/20/10|  MD|103|  9000|   5/8/17|  2|
>>> | 4/20/13|  ES|101| 19000|  4/20/17|  1|
>>> | 4/20/13|  OS|101| 1|  10/3/15|  2|
>>> | 4/20/12|  DS|102| 13000|   5/9/17|  1|
>>> | 4/20/12|  CS|102| 12000|   5/8/17|  2|
>>> +++---+--+-+---+
>>>
>>> >>> res = spark.sql("select * from (select *, row_number() over
>>> (partition by id order by income_age_ts desc) r from t) x where r=1")
>>> >>> res.show()
>>> +++---+--+-+---+
>>> |JoinDate|dept| id|income|income_age_ts|  r|
>>> +++---+--+-+---+
>>> | 4/20/10|  EQ|103| 1|   5/9/17|  1|
>>> | 4/20/13|  ES|101| 19000|  4/20/17|  1|
>>> | 4/20/12|  DS|102| 13000|   5/9/17|  1|
>>> +++---+--+-+---+
>>>
>>> This should be better because it uses all in-built optimizations in
>>> Spark.
>>>
>>> Best
>>> Ayan
>>>
>>> On Wed, Aug 30, 2017 at 11:06 AM, purna pradeep <purna2prad...@gmail.com
>>> > wrote:
>>>
>>>> Please click on unnamed text/html  link for better view
>>>>
>>>> On Tue, Aug 29, 2017 at 8:11 PM purna pradeep <purna2prad...@gmail.com>
>>>> wrote:
>>>>
>>>>>
>>>>> -- Forwarded message -
>>>>> From: Mamillapalli, Purna Pradeep <
>>>>> purnapradeep.mamillapa...@capitalone.com>
>>>>> Date: Tue, Aug 29, 2017 at 8:08 PM
>>>>> Subject: Spark question
>>>>> To: purna pradeep <purna2prad...@gmail.com>
>>>>>
>>>>> Below is the input Dataframe(In real this is a very large Dataframe)
>>>>>
>>>>>
>>>>>
>>>>> EmployeeID
>>>>>
>>>>> INCOME
>>>>>
>>>>> INCOME AGE TS
>>>>>
>>>>> JoinDate
>>>>>
>>>>> Dept
>>>>>
>>>>> 101
>>>>>
>>>>> 19000
>>>>>
>>>>> 4/20/17
>>>>>
>>>>> 4/20/13
>>>>>
>>>>> ES
>>>>>
>>>>> 101
>>>>>
>>>>> 1
>>>>>
>>>>> 10/3/15
>>>>>
>>>>> 4/20/13
>>>>

Re: Select entire row based on a logic applied on 2 columns across multiple rows

2017-08-29 Thread purna pradeep
@ayan,

Thanks for your response

I would like to have functions in this case  calculateIncome and the reason
why I need function is to reuse in other parts of the application ..that's
the reason I'm planning for mapgroups with function as argument which takes
rowiterator ..but not sure if this is the best to implement as my initial
dataframe is very large

On Tue, Aug 29, 2017 at 10:24 PM ayan guha <guha.a...@gmail.com> wrote:

> Hi
>
> the tool you are looking for is window function.  Example:
>
> >>> df.show()
> +++---+--+-+
> |JoinDate|dept| id|income|income_age_ts|
> +++---+--+-+
> | 4/20/13|  ES|101| 19000|  4/20/17|
> | 4/20/13|  OS|101| 1|  10/3/15|
> | 4/20/12|  DS|102| 13000|   5/9/17|
> | 4/20/12|  CS|102| 12000|   5/8/17|
> | 4/20/10|  EQ|103| 1|   5/9/17|
> | 4/20/10|  MD|103|  9000|   5/8/17|
> +++---+--+-+
>
> >>> res = spark.sql("select *, row_number() over (partition by id order by
> income_age_ts desc) r from t")
> >>> res.show()
> +++---+--+-+---+
> |JoinDate|dept| id|income|income_age_ts|  r|
> +++---+--+-+---+
> | 4/20/10|  EQ|103| 1|   5/9/17|  1|
> | 4/20/10|  MD|103|  9000|   5/8/17|  2|
> | 4/20/13|  ES|101| 19000|  4/20/17|  1|
> | 4/20/13|  OS|101| 1|  10/3/15|  2|
> | 4/20/12|  DS|102| 13000|   5/9/17|  1|
> | 4/20/12|  CS|102| 12000|   5/8/17|  2|
> +++---+--+-+---+
>
> >>> res = spark.sql("select * from (select *, row_number() over (partition
> by id order by income_age_ts desc) r from t) x where r=1")
> >>> res.show()
> +++---+--+-+---+
> |JoinDate|dept| id|income|income_age_ts|  r|
> +++---+--+-+---+
> | 4/20/10|  EQ|103| 1|   5/9/17|  1|
> | 4/20/13|  ES|101| 19000|  4/20/17|  1|
> | 4/20/12|  DS|102| 13000|   5/9/17|  1|
> +++---+--+-+---+
>
> This should be better because it uses all in-built optimizations in Spark.
>
> Best
> Ayan
>
> On Wed, Aug 30, 2017 at 11:06 AM, purna pradeep <purna2prad...@gmail.com>
> wrote:
>
>> Please click on unnamed text/html  link for better view
>>
>> On Tue, Aug 29, 2017 at 8:11 PM purna pradeep <purna2prad...@gmail.com>
>> wrote:
>>
>>>
>>> -- Forwarded message -
>>> From: Mamillapalli, Purna Pradeep <
>>> purnapradeep.mamillapa...@capitalone.com>
>>> Date: Tue, Aug 29, 2017 at 8:08 PM
>>> Subject: Spark question
>>> To: purna pradeep <purna2prad...@gmail.com>
>>>
>>> Below is the input Dataframe(In real this is a very large Dataframe)
>>>
>>>
>>>
>>> EmployeeID
>>>
>>> INCOME
>>>
>>> INCOME AGE TS
>>>
>>> JoinDate
>>>
>>> Dept
>>>
>>> 101
>>>
>>> 19000
>>>
>>> 4/20/17
>>>
>>> 4/20/13
>>>
>>> ES
>>>
>>> 101
>>>
>>> 1
>>>
>>> 10/3/15
>>>
>>> 4/20/13
>>>
>>> OS
>>>
>>> 102
>>>
>>> 13000
>>>
>>> 5/9/17
>>>
>>> 4/20/12
>>>
>>> DS
>>>
>>> 102
>>>
>>> 12000
>>>
>>> 5/8/17
>>>
>>> 4/20/12
>>>
>>> CS
>>>
>>> 103
>>>
>>> 1
>>>
>>> 5/9/17
>>>
>>> 4/20/10
>>>
>>> EQ
>>>
>>> 103
>>>
>>> 9000
>>>
>>> 5/8/15
>>>
>>> 4/20/10
>>>
>>> MD
>>>
>>> Get the latest income of an employee which has  Income_age ts <10 months
>>>
>>> Expected output Dataframe
>>>
>>> EmployeeID
>>>
>>> INCOME
>>>
>>> INCOME AGE TS
>>>
>>> JoinDate
>>>
>>> Dept
>>>
>>> 101
>>>
>>> 19000
>>>
>>> 4/20/17
>>>
>>> 4/20/13
>>>
>>> ES
>>>
>>> 102
>>>
>>> 13000
>>>
>>> 5/9/17
>>>
>>> 4/20/12
>>>
>>> DS
>>>
>>> 103
>>>
>>> 1
>>>
>>> 5/9/17
>>>
>>> 4/20/10
>>>
>

Re: Select entire row based on a logic applied on 2 columns across multiple rows

2017-08-29 Thread purna pradeep
Please click on unnamed text/html  link for better view

On Tue, Aug 29, 2017 at 8:11 PM purna pradeep <purna2prad...@gmail.com>
wrote:

>
> -- Forwarded message -
> From: Mamillapalli, Purna Pradeep <
> purnapradeep.mamillapa...@capitalone.com>
> Date: Tue, Aug 29, 2017 at 8:08 PM
> Subject: Spark question
> To: purna pradeep <purna2prad...@gmail.com>
>
> Below is the input Dataframe(In real this is a very large Dataframe)
>
>
>
> EmployeeID
>
> INCOME
>
> INCOME AGE TS
>
> JoinDate
>
> Dept
>
> 101
>
> 19000
>
> 4/20/17
>
> 4/20/13
>
> ES
>
> 101
>
> 1
>
> 10/3/15
>
> 4/20/13
>
> OS
>
> 102
>
> 13000
>
> 5/9/17
>
> 4/20/12
>
> DS
>
> 102
>
> 12000
>
> 5/8/17
>
> 4/20/12
>
> CS
>
> 103
>
> 1
>
> 5/9/17
>
> 4/20/10
>
> EQ
>
> 103
>
> 9000
>
> 5/8/15
>
> 4/20/10
>
> MD
>
> Get the latest income of an employee which has  Income_age ts <10 months
>
> Expected output Dataframe
>
> EmployeeID
>
> INCOME
>
> INCOME AGE TS
>
> JoinDate
>
> Dept
>
> 101
>
> 19000
>
> 4/20/17
>
> 4/20/13
>
> ES
>
> 102
>
> 13000
>
> 5/9/17
>
> 4/20/12
>
> DS
>
> 103
>
> 1
>
> 5/9/17
>
> 4/20/10
>
> EQ
>
>
>





Below is what im planning to implement
>
>
>
> case class employee (*EmployeeID*: Int, *INCOME*: Int, INCOMEAGE: Int,
> *JOINDATE*: Int,DEPT:String)
>
>
>
> *val *empSchema = *new *StructType().add(*"EmployeeID"*,*"Int"*).add(
> *"INCOME"*, *"Int"*).add(*"INCOMEAGE"*,*"Date"*) . add(*"JOINDATE"*,
> *"Date"*). add(*"DEPT"*,*"String"*)
>
>
>
> *//Reading from the File **import *sparkSession.implicits._
>
> *val *readEmpFile = sparkSession.read
>   .option(*"sep"*, *","*)
>   .schema(empSchema)
>   .csv(INPUT_DIRECTORY)
>
>
> *//Create employee DataFrame **val *custDf = readEmpFile.as[employee]
>
>
> *//Adding Salary Column **val *groupByDf = custDf.groupByKey(a => a.*
> EmployeeID*)
>
>
> *val *k = groupByDf.mapGroups((key,value) => performETL(value))
>
>
>
>
>
> *def *performETL(empData: Iterator[employee]) : new employee  = {
>
>   *val *empList = empData.toList
>
>
> *//calculate income has Logic to figureout latest income for an account
> and returns latest income   val *income = calculateIncome(empList)
>
>
>   *for *(i <- empList) {
>
>   *val *row = i
>
> *return new *employee(row.EmployeeID, row.INCOMEAGE , income)
>   }
>   *return  "Done"*
>
>
>
> }
>
>
>
> Is this a better approach or even the right approach to implement the
> same.If not please suggest a better way to implement the same?
>
>
>
> --
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>


Re: use WithColumn with external function in a java jar

2017-08-29 Thread purna pradeep
Thanks, I'll check it out.

On Mon, Aug 28, 2017 at 10:22 PM Praneeth Gayam <praneeth.ga...@gmail.com>
wrote:

> You can create a UDF which will invoke your java lib
>
> def calculateExpense: UserDefinedFunction = udf((pexpense: String, cexpense: 
> String) => new MyJava().calculateExpense(pexpense.toDouble, 
> cexpense.toDouble))
>
>
>
>
>
> On Tue, Aug 29, 2017 at 6:53 AM, purna pradeep <purna2prad...@gmail.com>
> wrote:
>
>> I have data in a DataFrame with below columns
>>
>> 1)Fileformat is csv
>> 2)All below column datatypes are String
>>
>> employeeid,pexpense,cexpense
>>
>> Now I need to create a new DataFrame which has new column called
>> `expense`, which is calculated based on columns `pexpense`, `cexpense`.
>>
>> The tricky part is the calculation algorithm is not an **UDF** function
>> which I created, but it's an external function that needs to be imported
>> from a Java library which takes primitive types as arguments - in this case
>> `pexpense`, `cexpense` - to calculate the value required for new column.
>>
>> The external function signature
>>
>> public class MyJava
>>
>> {
>>
>> public Double calculateExpense(Double pexpense, Double cexpense) {
>>// calculation
>> }
>>
>> }
>>
>> So how can I invoke that external function to create a new calculated
>> column. Can I register that external function as UDF in my Spark
>> application?
>>
>> Stackoverflow reference
>>
>>
>> https://stackoverflow.com/questions/45928007/use-withcolumn-with-external-function
>>
>>
>>
>>
>>
>>
>


Select entire row based on a logic applied on 2 columns across multiple rows

2017-08-29 Thread purna pradeep
-- Forwarded message -
From: Mamillapalli, Purna Pradeep <purnapradeep.mamillapa...@capitalone.com>
Date: Tue, Aug 29, 2017 at 8:08 PM
Subject: Spark question
To: purna pradeep <purna2prad...@gmail.com>

Below is the input Dataframe(In real this is a very large Dataframe)



EmployeeID

INCOME

INCOME AGE TS

JoinDate

Dept

101

19000

4/20/17

4/20/13

ES

101

1

10/3/15

4/20/13

OS

102

13000

5/9/17

4/20/12

DS

102

12000

5/8/17

4/20/12

CS

103

1

5/9/17

4/20/10

EQ

103

9000

5/8/15

4/20/10

MD

Get the latest income of an employee which has  Income_age ts <10 months

Expected output Dataframe

EmployeeID

INCOME

INCOME AGE TS

JoinDate

Dept

101

19000

4/20/17

4/20/13

ES

102

13000

5/9/17

4/20/12

DS

103

1

5/9/17

4/20/10

EQ


Below is what im planning to implement



case class employee (*EmployeeID*: Int, *INCOME*: Int, INCOMEAGE: Int,
*JOINDATE*: Int,DEPT:String)



*val *empSchema = *new *StructType().add(*"EmployeeID"*,*"Int"*).add(
*"INCOME"*, *"Int"*).add(*"INCOMEAGE"*,*"Date"*) . add(*"JOINDATE"*,*"Date"*).
add(*"DEPT"*,*"String"*)



*//Reading from the File **import *sparkSession.implicits._

*val *readEmpFile = sparkSession.read
  .option(*"sep"*, *","*)
  .schema(empSchema)
  .csv(INPUT_DIRECTORY)


*//Create employee DataFrame **val *custDf = readEmpFile.as[employee]


*//Adding Salary Column **val *groupByDf = custDf.groupByKey(a => a.*
EmployeeID*)


*val *k = groupByDf.mapGroups((key,value) => performETL(value))





*def *performETL(empData: Iterator[employee]) : new employee  = {

  *val *empList = empData.toList


*//calculate income has Logic to figureout latest income for an account and
returns latest income   val *income = calculateIncome(empList)


  *for *(i <- empList) {

  *val *row = i

*return new *employee(row.EmployeeID, row.INCOMEAGE , income)
  }
  *return  "Done"*



}



Is this a better approach or even the right approach to implement the
same.If not please suggest a better way to implement the same?



--

The information contained in this e-mail is confidential and/or proprietary
to Capital One and/or its affiliates and may only be used solely in
performance of work or services for Capital One. The information
transmitted herewith is intended only for use by the individual or entity
to which it is addressed. If the reader of this message is not the intended
recipient, you are hereby notified that any review, retransmission,
dissemination, distribution, copying or other use of, or taking of any
action in reliance upon this information is strictly prohibited. If you
have received this communication in error, please contact the sender and
delete the material from your computer.


use WithColumn with external function in a java jar

2017-08-28 Thread purna pradeep
I have data in a DataFrame with below columns

1)Fileformat is csv
2)All below column datatypes are String

employeeid,pexpense,cexpense

Now I need to create a new DataFrame which has new column called `expense`,
which is calculated based on columns `pexpense`, `cexpense`.

The tricky part is the calculation algorithm is not an **UDF** function
which I created, but it's an external function that needs to be imported
from a Java library which takes primitive types as arguments - in this case
`pexpense`, `cexpense` - to calculate the value required for new column.

The external function signature

public class MyJava

{

public Double calculateExpense(Double pexpense, Double cexpense) {
   // calculation
}

}

So how can I invoke that external function to create a new calculated
column. Can I register that external function as UDF in my Spark
application?

Stackoverflow reference

https://stackoverflow.com/questions/45928007/use-withcolumn-with-external-function


Re: Restart streaming query spark 2.1 structured streaming

2017-08-16 Thread purna pradeep
And also is query.stop() is graceful stop operation?what happens to already
received data will it be processed ?

On Tue, Aug 15, 2017 at 7:21 PM purna pradeep <purna2prad...@gmail.com>
wrote:

> Ok thanks
>
> Few more
>
> 1.when I looked into the documentation it says onQueryprogress is not
> threadsafe ,So Is this method would be the right place to refresh cache?and
> no need to restart query if I choose listener ?
>
> The methods are not thread-safe as they may be called from different
> threads.
>
>
>
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
>
>
>
> 2.if I use streamingquerylistner onqueryprogress my understanding is
> method will be executed only when the query is in progress so if I refresh
> data frame here without restarting  query will it impact application ?
>
> 3.should I use unpersist (Boolean) blocking method or async method
> unpersist() as the data size is big.
>
> I feel your solution is better as it stops query --> refresh cache -->
> starts query if I compromise on little downtime even cached dataframe is
> huge .I'm not sure how listener behaves as it's asynchronous, correct me if
> I'm wrong.
>
> On Tue, Aug 15, 2017 at 6:36 PM Tathagata Das <tathagata.das1...@gmail.com>
> wrote:
>
>> Both works. The asynchronous method with listener will have less of down
>> time, just that the first trigger/batch after the asynchronous
>> unpersist+persist will probably take longer as it has to reload the data.
>>
>>
>> On Tue, Aug 15, 2017 at 2:29 PM, purna pradeep <purna2prad...@gmail.com>
>> wrote:
>>
>>> Thanks tathagata das actually I'm planning to something like this
>>>
>>> activeQuery.stop()
>>>
>>> //unpersist and persist cached data frame
>>>
>>> df.unpersist()
>>>
>>> //read the updated data //data size of df is around 100gb
>>>
>>> df.persist()
>>>
>>>  activeQuery = startQuery()
>>>
>>>
>>> the cached data frame size around 100gb ,so the question is this the
>>> right place to refresh this huge cached data frame ?
>>>
>>> I'm also trying to refresh cached data frame in onqueryprogress() method
>>> in a class which extends StreamingQuerylistner
>>>
>>> Would like to know which is the best place to refresh cached data frame
>>> and why
>>>
>>> Thanks again for the below response
>>>
>>> On Tue, Aug 15, 2017 at 4:45 PM Tathagata Das <
>>> tathagata.das1...@gmail.com> wrote:
>>>
>>>> You can do something like this.
>>>>
>>>>
>>>> def startQuery(): StreamingQuery = {
>>>>// create your streaming dataframes
>>>>// start the query with the same checkpoint directory}
>>>>
>>>> // handle to the active queryvar activeQuery: StreamingQuery = null
>>>> while(!stopped) {
>>>>
>>>>    if (activeQuery = null) { // if query not active, start query
>>>>  activeQuery = startQuery()
>>>>
>>>>} else if (shouldRestartQuery())  {  // check your condition and 
>>>> restart query
>>>>  activeQuery.stop()
>>>>  activeQuery = startQuery()
>>>>}
>>>>
>>>>activeQuery.awaitTermination(100)   // wait for 100 ms.
>>>>// if there is any error it will throw exception and quit the loop
>>>>// otherwise it will keep checking the condition every 100ms}
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, Aug 15, 2017 at 1:13 PM, purna pradeep <purna2prad...@gmail.com
>>>> > wrote:
>>>>
>>>>> Thanks Michael
>>>>>
>>>>> I guess my question is little confusing ..let me try again
>>>>>
>>>>>
>>>>> I would like to restart streaming query programmatically while my
>>>>> streaming application is running based on a condition and why I want to do
>>>>> this
>>>>>
>>>>> I want to refresh a cached data frame based on a condition and the
>>>>> best way to do this restart streaming query suggested by Tdas below for
>>>>> similar problem
>>>>>
>>>>>
>>>>> http://mail-archives.apache.org/mod_mbox/spark-user/201705.mbox/%3cCA+AHuKn+vSEWkJD=bsst6g5bdzdas6wmn+fwmn4jtm1x1nd...@mail.gmail.com%3e
>>>>>
>>&g

Re: Restart streaming query spark 2.1 structured streaming

2017-08-15 Thread purna pradeep
Ok thanks

Few more

1.when I looked into the documentation it says onQueryprogress is not
threadsafe ,So Is this method would be the right place to refresh cache?and
no need to restart query if I choose listener ?

The methods are not thread-safe as they may be called from different
threads.


https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala



2.if I use streamingquerylistner onqueryprogress my understanding is method
will be executed only when the query is in progress so if I refresh data
frame here without restarting  query will it impact application ?

3.should I use unpersist (Boolean) blocking method or async method
unpersist() as the data size is big.

I feel your solution is better as it stops query --> refresh cache -->
starts query if I compromise on little downtime even cached dataframe is
huge .I'm not sure how listener behaves as it's asynchronous, correct me if
I'm wrong.

On Tue, Aug 15, 2017 at 6:36 PM Tathagata Das <tathagata.das1...@gmail.com>
wrote:

> Both works. The asynchronous method with listener will have less of down
> time, just that the first trigger/batch after the asynchronous
> unpersist+persist will probably take longer as it has to reload the data.
>
>
> On Tue, Aug 15, 2017 at 2:29 PM, purna pradeep <purna2prad...@gmail.com>
> wrote:
>
>> Thanks tathagata das actually I'm planning to something like this
>>
>> activeQuery.stop()
>>
>> //unpersist and persist cached data frame
>>
>> df.unpersist()
>>
>> //read the updated data //data size of df is around 100gb
>>
>> df.persist()
>>
>>  activeQuery = startQuery()
>>
>>
>> the cached data frame size around 100gb ,so the question is this the
>> right place to refresh this huge cached data frame ?
>>
>> I'm also trying to refresh cached data frame in onqueryprogress() method
>> in a class which extends StreamingQuerylistner
>>
>> Would like to know which is the best place to refresh cached data frame
>> and why
>>
>> Thanks again for the below response
>>
>> On Tue, Aug 15, 2017 at 4:45 PM Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> You can do something like this.
>>>
>>>
>>> def startQuery(): StreamingQuery = {
>>>// create your streaming dataframes
>>>// start the query with the same checkpoint directory}
>>>
>>> // handle to the active queryvar activeQuery: StreamingQuery = null
>>> while(!stopped) {
>>>
>>>if (activeQuery = null) { // if query not active, start query
>>>  activeQuery = startQuery()
>>>
>>>} else if (shouldRestartQuery())  {  // check your condition and 
>>> restart query
>>>  activeQuery.stop()
>>>  activeQuery = startQuery()
>>>}
>>>
>>>activeQuery.awaitTermination(100)   // wait for 100 ms.
>>>// if there is any error it will throw exception and quit the loop
>>>// otherwise it will keep checking the condition every 100ms}
>>>
>>>
>>>
>>>
>>> On Tue, Aug 15, 2017 at 1:13 PM, purna pradeep <purna2prad...@gmail.com>
>>> wrote:
>>>
>>>> Thanks Michael
>>>>
>>>> I guess my question is little confusing ..let me try again
>>>>
>>>>
>>>> I would like to restart streaming query programmatically while my
>>>> streaming application is running based on a condition and why I want to do
>>>> this
>>>>
>>>> I want to refresh a cached data frame based on a condition and the best
>>>> way to do this restart streaming query suggested by Tdas below for similar
>>>> problem
>>>>
>>>>
>>>> http://mail-archives.apache.org/mod_mbox/spark-user/201705.mbox/%3cCA+AHuKn+vSEWkJD=bsst6g5bdzdas6wmn+fwmn4jtm1x1nd...@mail.gmail.com%3e
>>>>
>>>> I do understand that checkpoint if helps in recovery and failures but I
>>>> would like to know "how to restart streaming query programmatically without
>>>> stopping my streaming application"
>>>>
>>>> In place of query.awaittermination should I need to have an logic to
>>>> restart query? Please suggest
>>>>
>>>>
>>>> On Tue, Aug 15, 2017 at 3:26 PM Michael Armbrust <
>>>> mich...@databricks.com> wrote:
>>>>
>>>>> See
>>>>> https://spark.apache.org/docs/latest/structur

Re: Restart streaming query spark 2.1 structured streaming

2017-08-15 Thread purna pradeep
Thanks tathagata das actually I'm planning to something like this

activeQuery.stop()

//unpersist and persist cached data frame

df.unpersist()

//read the updated data //data size of df is around 100gb

df.persist()

 activeQuery = startQuery()


the cached data frame size around 100gb ,so the question is this the right
place to refresh this huge cached data frame ?

I'm also trying to refresh cached data frame in onqueryprogress() method in
a class which extends StreamingQuerylistner

Would like to know which is the best place to refresh cached data frame and
why

Thanks again for the below response

On Tue, Aug 15, 2017 at 4:45 PM Tathagata Das <tathagata.das1...@gmail.com>
wrote:

> You can do something like this.
>
>
> def startQuery(): StreamingQuery = {
>// create your streaming dataframes
>// start the query with the same checkpoint directory}
>
> // handle to the active queryvar activeQuery: StreamingQuery = null
> while(!stopped) {
>
>if (activeQuery = null) { // if query not active, start query
>  activeQuery = startQuery()
>
>} else if (shouldRestartQuery())  {  // check your condition and 
> restart query
>  activeQuery.stop()
>  activeQuery = startQuery()
>}
>
>activeQuery.awaitTermination(100)   // wait for 100 ms.
>// if there is any error it will throw exception and quit the loop
>// otherwise it will keep checking the condition every 100ms}
>
>
>
>
> On Tue, Aug 15, 2017 at 1:13 PM, purna pradeep <purna2prad...@gmail.com>
> wrote:
>
>> Thanks Michael
>>
>> I guess my question is little confusing ..let me try again
>>
>>
>> I would like to restart streaming query programmatically while my
>> streaming application is running based on a condition and why I want to do
>> this
>>
>> I want to refresh a cached data frame based on a condition and the best
>> way to do this restart streaming query suggested by Tdas below for similar
>> problem
>>
>>
>> http://mail-archives.apache.org/mod_mbox/spark-user/201705.mbox/%3cCA+AHuKn+vSEWkJD=bsst6g5bdzdas6wmn+fwmn4jtm1x1nd...@mail.gmail.com%3e
>>
>> I do understand that checkpoint if helps in recovery and failures but I
>> would like to know "how to restart streaming query programmatically without
>> stopping my streaming application"
>>
>> In place of query.awaittermination should I need to have an logic to
>> restart query? Please suggest
>>
>>
>> On Tue, Aug 15, 2017 at 3:26 PM Michael Armbrust <mich...@databricks.com>
>> wrote:
>>
>>> See
>>> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing
>>>
>>> Though I think that this currently doesn't work with the console sink.
>>>
>>> On Tue, Aug 15, 2017 at 9:40 AM, purna pradeep <purna2prad...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>>>
>>>>> I'm trying to restart a streaming query to refresh cached data frame
>>>>>
>>>>> Where and how should I restart streaming query
>>>>>
>>>>
>>>>
>>>> val sparkSes = SparkSession
>>>>
>>>>   .builder
>>>>
>>>>   .config("spark.master", "local")
>>>>
>>>>   .appName("StreamingCahcePoc")
>>>>
>>>>   .getOrCreate()
>>>>
>>>>
>>>>
>>>> import sparkSes.implicits._
>>>>
>>>>
>>>>
>>>> val dataDF = sparkSes.readStream
>>>>
>>>>   .schema(streamSchema)
>>>>
>>>>   .csv("testData")
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>val query = counts.writeStream
>>>>
>>>>   .outputMode("complete")
>>>>
>>>>   .format("console")
>>>>
>>>>   .start()
>>>>
>>>>
>>>> query.awaittermination()
>>>>
>>>>
>>>>
>>>>>
>>>>>
>>>>>
>>>
>


Re: Restart streaming query spark 2.1 structured streaming

2017-08-15 Thread purna pradeep
Thanks Michael

I guess my question is little confusing ..let me try again


I would like to restart streaming query programmatically while my streaming
application is running based on a condition and why I want to do this

I want to refresh a cached data frame based on a condition and the best way
to do this restart streaming query suggested by Tdas below for similar
problem

http://mail-archives.apache.org/mod_mbox/spark-user/201705.mbox/%3cCA+AHuKn+vSEWkJD=bsst6g5bdzdas6wmn+fwmn4jtm1x1nd...@mail.gmail.com%3e

I do understand that checkpoint if helps in recovery and failures but I
would like to know "how to restart streaming query programmatically without
stopping my streaming application"

In place of query.awaittermination should I need to have an logic to
restart query? Please suggest


On Tue, Aug 15, 2017 at 3:26 PM Michael Armbrust <mich...@databricks.com>
wrote:

> See
> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing
>
> Though I think that this currently doesn't work with the console sink.
>
> On Tue, Aug 15, 2017 at 9:40 AM, purna pradeep <purna2prad...@gmail.com>
> wrote:
>
>> Hi,
>>
>>>
>>> I'm trying to restart a streaming query to refresh cached data frame
>>>
>>> Where and how should I restart streaming query
>>>
>>
>>
>> val sparkSes = SparkSession
>>
>>   .builder
>>
>>   .config("spark.master", "local")
>>
>>   .appName("StreamingCahcePoc")
>>
>>   .getOrCreate()
>>
>>
>>
>> import sparkSes.implicits._
>>
>>
>>
>> val dataDF = sparkSes.readStream
>>
>>   .schema(streamSchema)
>>
>>   .csv("testData")
>>
>>
>>
>>
>>
>>val query = counts.writeStream
>>
>>   .outputMode("complete")
>>
>>   .format("console")
>>
>>   .start()
>>
>>
>> query.awaittermination()
>>
>>
>>
>>>
>>>
>>>
>


Restart streaming query spark 2.1 structured streaming

2017-08-15 Thread purna pradeep
Hi,

>
> I'm trying to restart a streaming query to refresh cached data frame
>
> Where and how should I restart streaming query
>


val sparkSes = SparkSession

  .builder

  .config("spark.master", "local")

  .appName("StreamingCahcePoc")

  .getOrCreate()



import sparkSes.implicits._



val dataDF = sparkSes.readStream

  .schema(streamSchema)

  .csv("testData")





   val query = counts.writeStream

  .outputMode("complete")

  .format("console")

  .start()


query.awaittermination()



>
>
>


StreamingQueryListner spark structered Streaming

2017-08-09 Thread purna pradeep
Im working on structered streaming application wherein im reading from
Kafka as stream and for each batch of streams i need to perform S3 lookup
file (which is nearly 200gb) to fetch some attributes .So im using
df.persist() (basically caching the lookup) but i need to refresh the
dataframe as the S3 lookup data changes frequently.im using below code


class RefreshcachedDF(sparkSession: SparkSession) extends
StreamingQueryListener {

override def onQueryStarted(event:
org.apache.spark.sql.streaming.StreamingQueryListener.QueryStartedEvent):
Unit = {}
override def onQueryTerminated(event:
org.apache.spark.sql.streaming.StreamingQueryListener.QueryTerminatedEvent):
Unit = {}

override def onQueryProgress(event:
StreamingQueryListener.QueryProgressEvent): Unit = {
   val currTime = System.currentTimeMillis()
   if (currTime > (latestrefreshtime mentioned in a
globaltempview)) {
  //oldDF is a cached Dataframe created from GlobalTempView
which is of size 150GB.
  oldDF.unpersist() //I guess this is async call ,should i use
unpersist(true) which is blocking?and is it safe ?
  val inputDf: DataFrame = readFile(spec, sparkSession)
  val recreateddf = inputDf.persist()
  val count = recreateddf.count()
  }

  }
}
  }


Is the above approach is a better solution to refresh cached dataframe? and
the trigger for this refresh is will store the expirydate of cache for S3
in a globaltempview .

Note:S3 is one lookup source but i do have other sources which has data
size of 20 to 30 GB

 - So the question is this the right place to refresh the cached df ?
 - if yes should i use blocking or non-blocking unpersist method as the
data is huge 15GB?
 - For similar issue i see below response from Tdas with subject as Re:
Refreshing a persisted RDD

"Yes, you will have to recreate the streaming Dataframe along with the
static Dataframe, and restart the query. There isnt a currently
feasible to
do this without a query restart. But restarting a query WITHOUT
restarting
the whole application + spark cluster, is reasonably fast. If your
applicatoin can tolerate 10 second latencies, then stopping and
restarting
a query within the same Spark application is a reasonable solution."

[http://mail-archives.apache.org/mod_mbox/spark-user/201705.mbox/browser]


  [1]: http://SparkMailingList

So if thats better solution should i restart query as below

query.processAllavaialble()
query.stop()
df.unpersist()
val inputDf: DataFrame = readFile(spec, sparkSession) //read file from S3
or anyother source
val recreateddf = inputDf.persist()
query.start()


when i looked into spark documentation of above methods
void processAllAvailable() ///documentation says This method is intended
for testing///
Blocks until all available data in the source has been processed and
committed to the sink. This method is intended for testing. Note that in
the case of continually arriving data, this method may block forever.
Additionally, this method is only guaranteed to block until data that has
been synchronously appended data to a Source prior to invocation. (i.e.
getOffset must immediately reflect the addition).

stop()
Stops the execution of this query if it is running. This method blocks
until the threads performing execution has stopped.

https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/sql/streaming/StreamingQuery.html#processAllAvailable()

Please suggest a better approach to refresh the cache.


Spark streaming - TIBCO EMS

2017-05-15 Thread Pradeep
What is the best way to connect to TIBCO EMS using spark streaming?

Do we need to write custom receivers or any libraries already exist.

Thanks,
Pradeep

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Long Shuffle Read Blocked Time

2017-04-20 Thread Pradeep Gollakota
Hi All,

It appears that the bottleneck in my job was the EBS volumes. Very high i/o
wait times across the cluster. I was only using 1 volume. Increasing to 4
made it faster.

Thanks,
Pradeep

On Thu, Apr 20, 2017 at 3:12 PM, Pradeep Gollakota <pradeep...@gmail.com>
wrote:

> Hi All,
>
> I have a simple ETL job that reads some data, shuffles it and writes it
> back out. This is running on AWS EMR 5.4.0 using Spark 2.1.0.
>
> After Stage 0 completes and the job starts Stage 1, I see a huge slowdown
> in the job. The CPU usage is low on the cluster, as is the network I/O.
> From the Spark Stats, I see large values for the Shuffle Read Blocked Time.
> As an example, one of my tasks completed in 18 minutes, but spent 15
> minutes waiting for remote reads.
>
> I'm not sure why the shuffle is so slow. Are there things I can do to
> increase the performance of the shuffle?
>
> Thanks,
> Pradeep
>


Long Shuffle Read Blocked Time

2017-04-20 Thread Pradeep Gollakota
Hi All,

I have a simple ETL job that reads some data, shuffles it and writes it
back out. This is running on AWS EMR 5.4.0 using Spark 2.1.0.

After Stage 0 completes and the job starts Stage 1, I see a huge slowdown
in the job. The CPU usage is low on the cluster, as is the network I/O.
>From the Spark Stats, I see large values for the Shuffle Read Blocked Time.
As an example, one of my tasks completed in 18 minutes, but spent 15
minutes waiting for remote reads.

I'm not sure why the shuffle is so slow. Are there things I can do to
increase the performance of the shuffle?

Thanks,
Pradeep


Spark subscribe

2016-12-22 Thread pradeep s
Hi ,
Can you please add me to spark subscription list.
Regards
Pradeep S


wholeTextFiles()

2016-12-12 Thread Pradeep
Hi,

Why there is an restriction on max file size that can be read by 
wholeTextFile() method.

I can read a 1.5 gigs file but get Out of memory for 2 gig file. 

Also, how can I raise this as an defect in spark jira. Can someone please guide.

Thanks,
Pradeep

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



MLlib to Compute boundaries of a rectangle given random points on its Surface

2016-12-06 Thread Pradeep Gaddam
Hello,

Can someone please let me know if it is possible to construct a surface(for 
example:- Rectangle) given random points on its surface using Spark MLlib?

Thanks
Pradeep Gaddam




This message and any attachments may contain confidential information of View, 
Inc. If you are not the intended recipient you are hereby notified that any 
dissemination, copying or distribution of this message, or files associated 
with this message, is strictly prohibited. If you have received this message in 
error, please notify us immediately by replying to the message and delete the 
message from your computer.


Re: Design patterns for Spark implementation

2016-12-04 Thread Pradeep Gaddam
I was hoping for someone to answer this question, As it  resonates with many 
developers who are new to Spark and trying to adopt it at their work.
Regards
Pradeep

On Dec 3, 2016, at 9:00 AM, Vasu Gourabathina 
<vgour...@gmail.com<mailto:vgour...@gmail.com>> wrote:

Hi,

I know this is a broad question. If this is not the right forum, appreciate if 
you can point to other sites/areas that may be helpful.

Before posing this question, I did use our friend Google, but sanitizing the 
query results from my need angle hasn't been easy.

Who I am:
   - Have done data processing and analytics, but relatively new to Spark world

What I am looking for:
  - Architecture/Design of a ML system using Spark
  - In particular, looking for best practices that can support/bridge both 
Engineering and Data Science teams

Engineering:
   - Build a system that has typical engineering needs, data processing, 
scalability, reliability, availability, fault-tolerance etc.
   - System monitoring etc.
Data Science:
   - Build a system for Data Science team to do data exploration activities
   - Develop models using supervised learning and tweak models

Data:
  - Batch and incremental updates - mostly structured or semi-structured (some 
data from transaction systems, weblogs, click stream etc.)
  - Steaming, in near term, but not to begin with

Data Storage:
  - Data is expected to grow on a daily basis...so, system should be able to 
support and handle big data
  - May be, after further analysis, there might be a possibility/need to 
archive some of the data...it all depends on how the ML models were built and 
results were stored/used for future usage

Data Analysis:
  - Obvious data related aspects, such as data cleansing, data transformation, 
data partitioning etc
  - May be run models on windows of data. For example: last 1-year, 2-years etc.

ML models:
  - Ability to store model versions and previous results
  - Compare results of different variants of models

Consumers:
  - RESTful webservice clients to look at the results

So, the questions I have are:
1) Are there architectural and design patterns that I can use based on industry 
best-practices. In particular:
  - data ingestion
  - data storage (for eg. go with HDFS or not)
  - data partitioning, especially in Spark world
  - running parallel ML models and combining results etc.
  - consumption of final results by clients (for eg. by pushing results to 
Cassandra, NoSQL dbs etc.)

Again, I know this is a broad questionPointers to some best-practices in 
some of the areas, if not all, would be highly appreciated. Open to purchase 
any books that may have relevant information.

Thanks much folks,
Vasu.



This message and any attachments may contain confidential information of View, 
Inc. If you are not the intended recipient you are hereby notified that any 
dissemination, copying or distribution of this message, or files associated 
with this message, is strictly prohibited. If you have received this message in 
error, please notify us immediately by replying to the message and delete the 
message from your computer.


Kafka message metadata with Dstreams

2016-08-25 Thread Pradeep
Hi All,

I am using Dstreams to read Kafka topics. While I can read the messages fine, I 
also want to get metadata on the message such as offset, time it was put to 
topic etc.. Is there any Java Api to get this info.

Thanks,
Pradeep

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Log rollover in spark streaming jobs

2016-08-23 Thread Pradeep
Hi All,

I am running Java spark streaming jobs in yarn-client mode. Is there a way I 
can manage logs rollover on edge node. I have a 10 second batch and log file 
volume is huge. 

Thanks,
Pradeep

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Stop Spark Streaming Jobs

2016-08-02 Thread Pradeep
Thanks Park. I am doing the same. Was trying to understand if there are other 
ways.

Thanks,
Pradeep

> On Aug 2, 2016, at 10:25 PM, Park Kyeong Hee <kh1979.p...@samsung.com> wrote:
> 
> So sorry. Your name was Pradeep !!
> 
> -Original Message-
> From: Park Kyeong Hee [mailto:kh1979.p...@samsung.com] 
> Sent: Wednesday, August 03, 2016 11:24 AM
> To: 'Pradeep'; 'user@spark.apache.org'
> Subject: RE: Stop Spark Streaming Jobs
> 
> Hi. Paradeep
> 
> 
> Did you mean, how to kill the job?
> If yes, you should kill the driver and follow next.
> 
> on yarn-client
> 1. find pid - "ps -es | grep "
> 2. kill it - "kill -9 "
> 3. check executors were down - "yarn application -list"
> 
> on yarn-cluster
> 1. find driver's application ID - "yarn application -list"
> 2. stop it - "yarn application -kill "
> 3. check driver and executors were down - "yarn application -list"
> 
> 
> Thanks.
> 
> -Original Message-
> From: Pradeep [mailto:pradeep.mi...@mail.com] 
> Sent: Wednesday, August 03, 2016 10:48 AM
> To: user@spark.apache.org
> Subject: Stop Spark Streaming Jobs
> 
> Hi All,
> 
> My streaming job reads data from Kafka. The job is triggered and pushed to
> background with nohup.
> 
> What are the recommended ways to stop job either on yarn-client or cluster
> mode.
> 
> Thanks,
> Pradeep
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 
> 
> 
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Stop Spark Streaming Jobs

2016-08-02 Thread Pradeep
Hi All,

My streaming job reads data from Kafka. The job is triggered and pushed to 
background with nohup.

What are the recommended ways to stop job either on yarn-client or cluster mode.

Thanks,
Pradeep

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: spark-submit hangs forever after all tasks finish(spark 2.0.0 stable version on yarn)

2016-07-31 Thread Pradeep
Hi,

Are you running on yarn-client or cluster mode?

Pradeep

> On Jul 30, 2016, at 7:34 PM, taozhuo <taoz...@gmail.com> wrote:
> 
> below is the error messages that seem run infinitely:
> 
> 
> 16/07/30 23:25:38 DEBUG ProtobufRpcEngine: Call: getApplicationReport took
> 1ms
> 16/07/30 23:25:39 DEBUG Client: IPC Client (1735131305) connection to
> /10.80.1.168:8032 from zhuotao sending #147247
> 16/07/30 23:25:39 DEBUG Client: IPC Client (1735131305) connection to
> /10.80.1.168:8032 from zhuotao got value #147247
> 16/07/30 23:25:39 DEBUG ProtobufRpcEngine: Call: getApplicationReport took
> 1ms
> 16/07/30 23:25:40 DEBUG Client: IPC Client (1735131305) connection to
> /10.80.1.168:8032 from zhuotao sending #147248
> 16/07/30 23:25:40 DEBUG Client: IPC Client (1735131305) connection to
> /10.80.1.168:8032 from zhuotao got value #147248
> 16/07/30 23:25:40 DEBUG ProtobufRpcEngine: Call: getApplicationReport took
> 1ms
> 16/07/30 23:25:41 DEBUG Client: IPC Client (1735131305) connection to
> /10.80.1.168:8032 from zhuotao sending #147249
> 16/07/30 23:25:41 DEBUG Client: IPC Client (1735131305) connection to
> /10.80.1.168:8032 from zhuotao got value #147249
> 16/07/30 23:25:41 DEBUG ProtobufRpcEngine: Call: getApplicationReport took
> 1ms
> 16/07/30 23:25:42 DEBUG Client: IPC Client (1735131305) connection to
> /10.80.1.168:8032 from zhuotao sending #147250
> 16/07/30 23:25:42 DEBUG Client: IPC Client (1735131305) connection to
> /10.80.1.168:8032 from zhuotao got value #147250
> 16/07/30 23:25:42 DEBUG ProtobufRpcEngine: Call: getApplicationReport took
> 1ms
> 16/07/30 23:25:43 DEBUG Client: IPC Client (1735131305) connection to
> /10.80.1.168:8032 from zhuotao sending #147251
> 16/07/30 23:25:43 DEBUG Client: IPC Client (1735131305) connection to
> /10.80.1.168:8032 from zhuotao got value #147251
> 16/07/30 23:25:43 DEBUG ProtobufRpcEngine: Call: getApplicationReport took
> 1ms
> 16/07/30 23:25:44 DEBUG Client: IPC Client (1735131305) connection to
> /10.80.1.168:8032 from zhuotao sending #147252
> 16/07/30 23:25:44 DEBUG Client: IPC Client (1735131305) connection to
> /10.80.1.168:8032 from zhuotao got value #147252
> 16/07/30 23:25:44 DEBUG ProtobufRpcEngine: Call: getApplicationReport took
> 0ms
> 16/07/30 23:25:45 DEBUG Client: IPC Client (1735131305) connection to
> /10.80.1.168:8032 from zhuotao sending #147253
> 16/07/30 23:25:45 DEBUG Client: IPC Client (1735131305) connection to
> /10.80.1.168:8032 from zhuotao got value #147253
> 16/07/30 23:25:45 DEBUG ProtobufRpcEngine: Call: getApplicationReport took
> 0ms
> 16/07/30 23:25:46 DEBUG Client: IPC Client (1735131305) connection to
> /10.80.1.168:8032 from zhuotao sending #147254
> 16/07/30 23:25:46 DEBUG Client: IPC Client (1735131305) connection to
> /10.80.1.168:8032 from zhuotao got value #147254
> 16/07/30 23:25:46 DEBUG ProtobufRpcEngine: Call: getApplicationReport took
> 1ms
> 16/07/30 23:25:47 DEBUG Client: IPC Client (1735131305) connection to
> /10.80.1.168:8032 from zhuotao sending #147255
> 16/07/30 23:25:47 DEBUG Client: IPC Client (1735131305) connection to
> /10.80.1.168:8032 from zhuotao got value #147255
> 16/07/30 23:25:47 DEBUG ProtobufRpcEngine: Call: getApplicationReport took
> 1ms
> 16/07/30 23:25:48 DEBUG Client: IPC Client (1735131305) connection to
> /10.80.1.168:8032 from zhuotao sending #147256
> 16/07/30 23:25:48 DEBUG Client: IPC Client (1735131305) connection to
> /10.80.1.168:8032 from zhuotao got value #147256
> 16/07/30 23:25:48 DEBUG ProtobufRpcEngine: Call: getApplicationReport took
> 1ms
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/spark-submit-hangs-forever-after-all-tasks-finish-spark-2-0-0-stable-version-on-yarn-tp27436.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Website

2016-07-13 Thread Pradeep Gollakota
Worked for me if I go to https://spark.apache.org/site/ but not
https://spark.apache.org

On Wed, Jul 13, 2016 at 11:48 AM, Maurin Lenglart 
wrote:

> Same here
>
>
>
> *From: *Benjamin Kim 
> *Date: *Wednesday, July 13, 2016 at 11:47 AM
> *To: *manish ranjan 
> *Cc: *user 
> *Subject: *Re: Spark Website
>
>
>
> It takes me to the directories instead of the webpage.
>
>
>
> On Jul 13, 2016, at 11:45 AM, manish ranjan  wrote:
>
>
>
> working for me. What do you mean 'as supposed to'?
>
>
> ~Manish
>
>
>
> On Wed, Jul 13, 2016 at 11:45 AM, Benjamin Kim  wrote:
>
> Has anyone noticed that the spark.apache.org is not working as supposed
> to?
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>
>
>
>


Re: Spark-submit hangs indefinitely after job completion.

2016-05-24 Thread Pradeep Nayak
BTW, I am using a 6-node cluster with m4.2xlarge machines on amazon. I have
tried with both yarn-cluster and spark's native cluster mode as well.

On Tue, May 24, 2016 at 12:10 PM Mathieu Longtin <math...@closetwork.org>
wrote:

> I have been seeing the same behavior in standalone with a master.
>
>
> On Tue, May 24, 2016 at 3:08 PM Pradeep Nayak <pradeep1...@gmail.com>
> wrote:
>
>>
>>
>> I have posted the same question of Stack Overflow:
>> http://stackoverflow.com/questions/37421852/spark-submit-continues-to-hang-after-job-completion
>>
>> I am trying to test spark 1.6 with hdfs in AWS. I am using the wordcount
>> python example available in the examples folder. I submit the job with
>> spark-submit, the job completes successfully and its prints the results on
>> the console as well. The web-UI also says its completed. However the
>> spark-submit never terminates. I have verified that the context is stopped
>> in the word count example code as well.
>>
>> What could be wrong ?
>>
>> This is what I see on the console.
>>
>>
>> 6-05-24 14:58:04,749 INFO  [Thread-3] handler.ContextHandler 
>> (ContextHandler.java:doStop(843)) - stopped 
>> o.s.j.s.ServletContextHandler{/stages/stage,null}2016-05-24 14:58:04,749 
>> INFO  [Thread-3] handler.ContextHandler (ContextHandler.java:doStop(843)) - 
>> stopped o.s.j.s.ServletContextHandler{/stages/json,null}2016-05-24 
>> 14:58:04,749 INFO  [Thread-3] handler.ContextHandler 
>> (ContextHandler.java:doStop(843)) - stopped 
>> o.s.j.s.ServletContextHandler{/stages,null}2016-05-24 14:58:04,749 INFO  
>> [Thread-3] handler.ContextHandler (ContextHandler.java:doStop(843)) - 
>> stopped o.s.j.s.ServletContextHandler{/jobs/job/json,null}2016-05-24 
>> 14:58:04,750 INFO  [Thread-3] handler.ContextHandler 
>> (ContextHandler.java:doStop(843)) - stopped 
>> o.s.j.s.ServletContextHandler{/jobs/job,null}2016-05-24 14:58:04,750 INFO  
>> [Thread-3] handler.ContextHandler (ContextHandler.java:doStop(843)) - 
>> stopped o.s.j.s.ServletContextHandler{/jobs/json,null}2016-05-24 
>> 14:58:04,750 INFO  [Thread-3] handler.ContextHandler 
>> (ContextHandler.java:doStop(843)) - stopped 
>> o.s.j.s.ServletContextHandler{/jobs,null}2016-05-24 14:58:04,802 INFO  
>> [Thread-3] ui.SparkUI (Logging.scala:logInfo(58)) - Stopped Spark web UI at 
>> http://172.30.2.239:40402016-05-24 14:58:04,805 INFO  [Thread-3] 
>> cluster.SparkDeploySchedulerBackend (Logging.scala:logInfo(58)) - Shutting 
>> down all executors2016-05-24 14:58:04,805 INFO  [dispatcher-event-loop-2] 
>> cluster.SparkDeploySchedulerBackend (Logging.scala:logInfo(58)) - Asking 
>> each executor to shut down2016-05-24 14:58:04,814 INFO  
>> [dispatcher-event-loop-5] spark.MapOutputTrackerMasterEndpoint 
>> (Logging.scala:logInfo(58)) - MapOutputTrackerMasterEndpoint 
>> stopped!2016-05-24 14:58:04,818 INFO  [Thread-3] storage.MemoryStore 
>> (Logging.scala:logInfo(58)) - MemoryStore cleared2016-05-24 14:58:04,818 
>> INFO  [Thread-3] storage.BlockManager (Logging.scala:logInfo(58)) - 
>> BlockManager stopped2016-05-24 14:58:04,820 INFO  [Thread-3] 
>> storage.BlockManagerMaster (Logging.scala:logInfo(58)) - BlockManagerMaster 
>> stopped2016-05-24 14:58:04,821 INFO  [dispatcher-event-loop-3] 
>> scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint 
>> (Logging.scala:logInfo(58)) - OutputCommitCoordinator stopped!2016-05-24 
>> 14:58:04,824 INFO  [Thread-3] spark.SparkContext (Logging.scala:logInfo(58)) 
>> - Successfully stopped SparkContext2016-05-24 14:58:04,827 INFO  
>> [sparkDriverActorSystem-akka.actor.default-dispatcher-2] 
>> remote.RemoteActorRefProvider$RemotingTerminator 
>> (Slf4jLogger.scala:apply$mcV$sp(74)) - Shutting down remote 
>> daemon.2016-05-24 14:58:04,828 INFO  
>> [sparkDriverActorSystem-akka.actor.default-dispatcher-2] 
>> remote.RemoteActorRefProvider$RemotingTerminator 
>> (Slf4jLogger.scala:apply$mcV$sp(74)) - Remote daemon shut down; proceeding 
>> with flushing remote transports.2016-05-24 14:58:04,843 INFO  
>> [sparkDriverActorSystem-akka.actor.default-dispatcher-2] 
>> remote.RemoteActorRefProvider$RemotingTerminator 
>> (Slf4jLogger.scala:apply$mcV$sp(74)) - Remoting shut down.
>>
>>
>> I have to do a ctrl-c to terminate the spark-submit process. This is
>> really a weird problem and I have no idea how to fix this. Please let me
>> know if there are any logs I should be looking at, or doing things
>> differently here.
>>
>>
>> --
> Mathieu Longtin
> 1-514-803-8977
>


Spark-submit hangs indefinitely after job completion.

2016-05-24 Thread Pradeep Nayak
I have posted the same question of Stack Overflow:
http://stackoverflow.com/questions/37421852/spark-submit-continues-to-hang-after-job-completion

I am trying to test spark 1.6 with hdfs in AWS. I am using the wordcount
python example available in the examples folder. I submit the job with
spark-submit, the job completes successfully and its prints the results on
the console as well. The web-UI also says its completed. However the
spark-submit never terminates. I have verified that the context is stopped
in the word count example code as well.

What could be wrong ?

This is what I see on the console.


6-05-24 14:58:04,749 INFO  [Thread-3] handler.ContextHandler
(ContextHandler.java:doStop(843)) - stopped
o.s.j.s.ServletContextHandler{/stages/stage,null}2016-05-24
14:58:04,749 INFO  [Thread-3] handler.ContextHandler
(ContextHandler.java:doStop(843)) - stopped
o.s.j.s.ServletContextHandler{/stages/json,null}2016-05-24
14:58:04,749 INFO  [Thread-3] handler.ContextHandler
(ContextHandler.java:doStop(843)) - stopped
o.s.j.s.ServletContextHandler{/stages,null}2016-05-24 14:58:04,749
INFO  [Thread-3] handler.ContextHandler
(ContextHandler.java:doStop(843)) - stopped
o.s.j.s.ServletContextHandler{/jobs/job/json,null}2016-05-24
14:58:04,750 INFO  [Thread-3] handler.ContextHandler
(ContextHandler.java:doStop(843)) - stopped
o.s.j.s.ServletContextHandler{/jobs/job,null}2016-05-24 14:58:04,750
INFO  [Thread-3] handler.ContextHandler
(ContextHandler.java:doStop(843)) - stopped
o.s.j.s.ServletContextHandler{/jobs/json,null}2016-05-24 14:58:04,750
INFO  [Thread-3] handler.ContextHandler
(ContextHandler.java:doStop(843)) - stopped
o.s.j.s.ServletContextHandler{/jobs,null}2016-05-24 14:58:04,802 INFO
[Thread-3] ui.SparkUI (Logging.scala:logInfo(58)) - Stopped Spark web
UI at http://172.30.2.239:40402016-05-24 14:58:04,805 INFO  [Thread-3]
cluster.SparkDeploySchedulerBackend (Logging.scala:logInfo(58)) -
Shutting down all executors2016-05-24 14:58:04,805 INFO
[dispatcher-event-loop-2] cluster.SparkDeploySchedulerBackend
(Logging.scala:logInfo(58)) - Asking each executor to shut
down2016-05-24 14:58:04,814 INFO  [dispatcher-event-loop-5]
spark.MapOutputTrackerMasterEndpoint (Logging.scala:logInfo(58)) -
MapOutputTrackerMasterEndpoint stopped!2016-05-24 14:58:04,818 INFO
[Thread-3] storage.MemoryStore (Logging.scala:logInfo(58)) -
MemoryStore cleared2016-05-24 14:58:04,818 INFO  [Thread-3]
storage.BlockManager (Logging.scala:logInfo(58)) - BlockManager
stopped2016-05-24 14:58:04,820 INFO  [Thread-3]
storage.BlockManagerMaster (Logging.scala:logInfo(58)) -
BlockManagerMaster stopped2016-05-24 14:58:04,821 INFO
[dispatcher-event-loop-3]
scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint
(Logging.scala:logInfo(58)) - OutputCommitCoordinator
stopped!2016-05-24 14:58:04,824 INFO  [Thread-3] spark.SparkContext
(Logging.scala:logInfo(58)) - Successfully stopped
SparkContext2016-05-24 14:58:04,827 INFO
[sparkDriverActorSystem-akka.actor.default-dispatcher-2]
remote.RemoteActorRefProvider$RemotingTerminator
(Slf4jLogger.scala:apply$mcV$sp(74)) - Shutting down remote
daemon.2016-05-24 14:58:04,828 INFO
[sparkDriverActorSystem-akka.actor.default-dispatcher-2]
remote.RemoteActorRefProvider$RemotingTerminator
(Slf4jLogger.scala:apply$mcV$sp(74)) - Remote daemon shut down;
proceeding with flushing remote transports.2016-05-24 14:58:04,843
INFO  [sparkDriverActorSystem-akka.actor.default-dispatcher-2]
remote.RemoteActorRefProvider$RemotingTerminator
(Slf4jLogger.scala:apply$mcV$sp(74)) - Remoting shut down.


I have to do a ctrl-c to terminate the spark-submit process. This is really
a weird problem and I have no idea how to fix this. Please let me know if
there are any logs I should be looking at, or doing things differently here.


Is this possible to do in spark ?

2016-05-11 Thread Pradeep Nayak
Hi -

I have a very unique problem which I am trying to solve and I am not sure
if spark would help here.

I have a directory: /X/Y/a.txt and in the same structure /X/Y/Z/b.txt.

a.txt contains a unique serial number, say:
12345

and b.txt contains key value pairs.
a,1
b,1,
c,0 etc.

Everyday you receive data for a system Y. so there are multiple a.txt and
b.txt for a serial number.  The serial number doesn't change and that the
key. So there are multiple systems and the data of a whole year is
available and its huge.

I am trying to generate a report of unique serial numbers where the value
of the option a has changed to 1 over the last few months. Lets say the
default is 0. Also figure how many times it was toggled.


I am not sure how to read two text files in spark at the same time and
associated them with the serial number. Is there a way of doing this in
place given that we know the directory structure ? OR we should be
transforming the data anyway to solve this ?


how does sc.textFile translate regex in the input.

2016-04-13 Thread Pradeep Nayak
I am trying to understand on how spark's sc.textFile() works. I
specifically have the question on how it translates the paths with regex in
it.

For example:

files = sc.textFile("hdfs://:/file1/*/*/*/*.txt")

How does it find all the sub-directories and recurses to all the leaf
files. ? Is there any documentation on how this happens ?

Pradeep


Re: Why there's no api for SparkContext#textFiles to support multiple inputs ?

2015-11-11 Thread Pradeep Gollakota
IIRC, TextInputFormat supports an input path that is a comma separated
list. I haven't tried this, but I think you should just be able to do
sc.textFile("file1,file2,...")

On Wed, Nov 11, 2015 at 4:30 PM, Jeff Zhang  wrote:

> I know these workaround, but wouldn't it be more convenient and
> straightforward to use SparkContext#textFiles ?
>
> On Thu, Nov 12, 2015 at 2:27 AM, Mark Hamstra 
> wrote:
>
>> For more than a small number of files, you'd be better off using
>> SparkContext#union instead of RDD#union.  That will avoid building up a
>> lengthy lineage.
>>
>> On Wed, Nov 11, 2015 at 10:21 AM, Jakob Odersky 
>> wrote:
>>
>>> Hey Jeff,
>>> Do you mean reading from multiple text files? In that case, as a
>>> workaround, you can use the RDD#union() (or ++) method to concatenate
>>> multiple rdds. For example:
>>>
>>> val lines1 = sc.textFile("file1")
>>> val lines2 = sc.textFile("file2")
>>>
>>> val rdd = lines1 union lines2
>>>
>>> regards,
>>> --Jakob
>>>
>>> On 11 November 2015 at 01:20, Jeff Zhang  wrote:
>>>
 Although user can use the hdfs glob syntax to support multiple inputs.
 But sometimes, it is not convenient to do that. Not sure why there's no api
 of SparkContext#textFiles. It should be easy to implement that. I'd love to
 create a ticket and contribute for that if there's no other consideration
 that I don't know.

 --
 Best Regards

 Jeff Zhang

>>>
>>>
>>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: Why there's no api for SparkContext#textFiles to support multiple inputs ?

2015-11-11 Thread Pradeep Gollakota
Looks like what I was suggesting doesn't work. :/

On Wed, Nov 11, 2015 at 4:49 PM, Jeff Zhang <zjf...@gmail.com> wrote:

> Yes, that's what I suggest. TextInputFormat support multiple inputs. So in
> spark side, we just need to provide API to for that.
>
> On Thu, Nov 12, 2015 at 8:45 AM, Pradeep Gollakota <pradeep...@gmail.com>
> wrote:
>
>> IIRC, TextInputFormat supports an input path that is a comma separated
>> list. I haven't tried this, but I think you should just be able to do
>> sc.textFile("file1,file2,...")
>>
>> On Wed, Nov 11, 2015 at 4:30 PM, Jeff Zhang <zjf...@gmail.com> wrote:
>>
>>> I know these workaround, but wouldn't it be more convenient and
>>> straightforward to use SparkContext#textFiles ?
>>>
>>> On Thu, Nov 12, 2015 at 2:27 AM, Mark Hamstra <m...@clearstorydata.com>
>>> wrote:
>>>
>>>> For more than a small number of files, you'd be better off using
>>>> SparkContext#union instead of RDD#union.  That will avoid building up a
>>>> lengthy lineage.
>>>>
>>>> On Wed, Nov 11, 2015 at 10:21 AM, Jakob Odersky <joder...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hey Jeff,
>>>>> Do you mean reading from multiple text files? In that case, as a
>>>>> workaround, you can use the RDD#union() (or ++) method to concatenate
>>>>> multiple rdds. For example:
>>>>>
>>>>> val lines1 = sc.textFile("file1")
>>>>> val lines2 = sc.textFile("file2")
>>>>>
>>>>> val rdd = lines1 union lines2
>>>>>
>>>>> regards,
>>>>> --Jakob
>>>>>
>>>>> On 11 November 2015 at 01:20, Jeff Zhang <zjf...@gmail.com> wrote:
>>>>>
>>>>>> Although user can use the hdfs glob syntax to support multiple
>>>>>> inputs. But sometimes, it is not convenient to do that. Not sure why
>>>>>> there's no api of SparkContext#textFiles. It should be easy to implement
>>>>>> that. I'd love to create a ticket and contribute for that if there's no
>>>>>> other consideration that I don't know.
>>>>>>
>>>>>> --
>>>>>> Best Regards
>>>>>>
>>>>>> Jeff Zhang
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>>
>>> --
>>> Best Regards
>>>
>>> Jeff Zhang
>>>
>>
>>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: [SparkR] Missing Spark APIs in R

2015-06-30 Thread Pradeep Bashyal
Thanks Shivaram. I watched your talk and the plan to use ML APIs with R
flavor looks exciting.
Is there a different venue where I would be able to follow the SparkR API
progress?

Thanks
Pradeep

On Mon, Jun 29, 2015 at 1:12 PM, Shivaram Venkataraman 
shiva...@eecs.berkeley.edu wrote:

 The RDD API is pretty complex and we are not yet sure we want to export
 all those methods in the SparkR API. We are working towards exposing a more
 limited API in upcoming versions. You can find some more details in the
 recent Spark Summit talk at
 https://spark-summit.org/2015/events/sparkr-the-past-the-present-and-the-future/

 Thanks
 Shivaram

 On Mon, Jun 29, 2015 at 9:40 AM, Pradeep Bashyal prad...@bashyal.com
 wrote:

 Hello,

 I noticed that some of the spark-core APIs are not available with version
 1.4.0 release of SparkR. For example textFile(), flatMap() etc. The code
 seems to be there but is not exported in NAMESPACE. They were all available
 as part of the AmpLab Extras previously. I wasn't able to find any
 explanations of why they were not included with the release.

 Can anyone shed some light on it?

 Thanks
 Pradeep





[SparkR] Missing Spark APIs in R

2015-06-29 Thread Pradeep Bashyal
Hello,

I noticed that some of the spark-core APIs are not available with version
1.4.0 release of SparkR. For example textFile(), flatMap() etc. The code
seems to be there but is not exported in NAMESPACE. They were all available
as part of the AmpLab Extras previously. I wasn't able to find any
explanations of why they were not included with the release.

Can anyone shed some light on it?

Thanks
Pradeep


Re: ClassCastException when calling updateStateKey

2015-04-10 Thread Pradeep Rai
Hi Marcelo,
   I am not including Spark's classes. When I used the userClasspathFirst
flag, I started getting those errors.

Been there, done that. Removing guava classes was one of the first things I
tried.

I saw your replies to a similar problem from Sept.

http://apache-spark-developers-list.1001551.n3.nabble.com/guava-version-conflicts-td8480.html

It looks like my issue is the same cause, but different symptoms.

Thanks,
Pradeep.

On Fri, Apr 10, 2015 at 12:51 PM, Marcelo Vanzin van...@cloudera.com
wrote:

 On Fri, Apr 10, 2015 at 10:11 AM, Pradeep Rai prai...@gmail.com wrote:
  I tried the userClasspathFirst flag. I started getting NoClassDeFound
  Exception for spark classes like Function2, etc.

 Wait. Are you including Spark classes in your app's assembly? Don't do
 that...

 As for Guava, yeah, the mess around Optional and friends is
 unfortunate. One way you could try to work around it, if excluding
 Spark classes and the userClassPathFirst option doesn't work, is to
 explicitly remove the Optional (and related) classes from your app's
 fat jar, and cross your fingers.

 --
 Marcelo



How to set hadoop native library path in spark-1.1

2014-10-21 Thread Pradeep Ch
Hi all,

Can anyone tell me how to set the native library path in Spark.

Right not I am setting it using SPARK_LIBRARY_PATH environmental variable
in spark-env.sh. But still no success.

I am still seeing this in spark-shell.

NativeCodeLoader: Unable to load native-hadoop library for your platform...
using builtin-java classes where applicable


Thanks,
Pradeep


Re: Spark packaging

2014-04-09 Thread Pradeep baji
Thanks Prabeesh.


On Wed, Apr 9, 2014 at 12:37 AM, prabeesh k prabsma...@gmail.com wrote:

 Please refer

 http://prabstechblog.blogspot.in/2014/04/creating-single-jar-for-spark-project.html

 Regards,
 prabeesh


 On Wed, Apr 9, 2014 at 1:04 PM, Pradeep baji 
 pradeep.chanum...@gmail.comwrote:

 Hi all,

 I am new to spark and trying to learn it. Is there any document which
 describes how spark is packaged. ( like dependencies needed to build spark,
 which jar contains what after build etc)

 Thanks for the help.

 Regards,
 Pradeep





Multi master Spark

2014-04-09 Thread Pradeep Ch
Hi,

I want to enable Spark Master HA in spark. Documentation specifies that we
can do this with the help of Zookeepers. But what I am worried is how to
configure one master with the other and similarly how do workers know that
the have two masters? where do you specify the multi-master information?

Thanks for the help.

Thanks,
Pradeep


Re: Multi master Spark

2014-04-09 Thread Pradeep Ch
Thanks Dmitriy. But I want multi master support when running spark
standalone. Also I want to know if this multi master thing works if I use
spark-shell.


On Wed, Apr 9, 2014 at 3:26 PM, Dmitriy Lyubimov dlie...@gmail.com wrote:

 The only way i know to do this is to use mesos with zookeepers. you
 specify zookeeper url as spark url that contains multiple zookeeper hosts.
 Multiple mesos masters are then elected thru zookeeper leader election
 until current leader dies; at which point mesos will elect another master
 (if still left).

 iirc, in this mode spark master never runs, only master slaves are being
 spun by mesos slaves directly.





 On Wed, Apr 9, 2014 at 3:08 PM, Pradeep Ch pradeep.chanum...@gmail.comwrote:

 Hi,

 I want to enable Spark Master HA in spark. Documentation specifies that
 we can do this with the help of Zookeepers. But what I am worried is how to
 configure one master with the other and similarly how do workers know that
 the have two masters? where do you specify the multi-master information?

 Thanks for the help.

 Thanks,
 Pradeep