RE: Query on Spark Hive with kerberos Enabled on Kubernetes

2018-07-23 Thread Garlapati, Suryanarayana (Nokia - IN/Bangalore)
Hi Sandeep,
Any inputs on this?

Regards
Surya

From: Garlapati, Suryanarayana (Nokia - IN/Bangalore)
Sent: Saturday, July 21, 2018 6:50 PM
To: Sandeep Katta 
Cc: d...@spark.apache.org; user@spark.apache.org
Subject: RE: Query on Spark Hive with kerberos Enabled on Kubernetes

Hi Sandeep,
Thx for the response:
I am using following commands: (xml files hive-site.xml, core-site.xml and 
hdfs-site.xml are made available by exporting through the HADOOP_CONF_DIR 
option).

For HDFS Access which succeeds:
./spark-submit --deploy-mode cluster --master 
k8s://https://k8s-apiserver.bcmt.cluster.local:8443 --kubernetes-namespace 
default --conf spark.kubernetes.kerberos.enabled=true --conf 
spark.kubernetes.kerberos.principal= --conf 
spark.kubernetes.kerberos.keytab= --conf 
spark.kubernetes.driver.docker.image= --conf 
spark.kubernetes.executor.docker.image= --conf 
spark.kubernetes.initcontainer.docker.image= --conf 
spark.kubernetes.resourceStagingServer.uri=http://:1 
../examples/src/main/python/wordcount.py hdfs://:8020/tmp/wordcount.txt


For Hive Access (this is failing):
./spark-submit --deploy-mode cluster --master 
k8s://https://k8s-apiserver.bcmt.cluster.local:8443 --kubernetes-namespace 
default --conf spark.kubernetes.kerberos.enabled=true --files /etc/krb5.conf, 
,../examples/src/main/resources/kv1.txt --conf 
spark.kubernetes.kerberos.principal= --conf 
spark.kubernetes.kerberos.keytab= --conf 
spark.kubernetes.driver.docker.image= --conf 
spark.kubernetes.executor.docker.image= --conf 
spark.kubernetes.initcontainer.docker.image= --conf 
spark.kubernetes.resourceStagingServer.uri=http://:1 
../examples/src/main/python/sql/hive.py

Following is the error:
2018-07-19 04:15:55 INFO  HiveUtils:54 - Initializing HiveMetastoreConnection 
version 1.2.1 using Spark classes.
2018-07-19 04:15:56 INFO  metastore:376 - Trying to connect to metastore with 
URI thrift://vm-10-75-145-54:9083
2018-07-19 04:15:56 ERROR TSaslTransport:315 - SASL negotiation failure
javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: 
No valid credentials provided (Mechanism level: Failed to find any Kerberos 
tgt)]
at 
com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:211)
at 
org.apache.thrift.transport.TSaslClientTransport.handleSaslStartMessage(TSaslClientTransport.java:94)
at 
org.apache.thrift.transport.TSaslTransport.open(TSaslTransport.java:271)
at 
org.apache.thrift.transport.TSaslClientTransport.open(TSaslClientTransport.java:37)
at 
org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport$1.run(TUGIAssumingTransport.java:52)
at 
org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport$1.run(TUGIAssumingTransport.java:49)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)

If I don’t provide the krb5.conf in the above spark-submit:
I get an error saying unable to find any default realm.

One work around I had found, if I generate any tgt by doing the kinit and copy 
it into the driver pod  into location /tmp/krb5cc_0, it works fine. I guess 
this should not be the way to do it. It should generate automatically the tgt 
and should access the hive metastore. Please let me know, if doing wrong.

Regards
Surya

From: Sandeep Katta [mailto:sandeep0102.opensou...@gmail.com]
Sent: Friday, July 20, 2018 9:59 PM
To: Garlapati, Suryanarayana (Nokia - IN/Bangalore) 
mailto:suryanarayana.garlap...@nokia.com>>
Cc: d...@spark.apache.org; 
user@spark.apache.org
Subject: Re: Query on Spark Hive with kerberos Enabled on Kubernetes

Can you please tell us what exception you ve got,any logs for the same ?

On Fri, 20 Jul 2018 at 8:36 PM, Garlapati, Suryanarayana (Nokia - IN/Bangalore) 
mailto:suryanarayana.garlap...@nokia.com>> 
wrote:
Hi All,
I am trying to use Spark 2.2.0 
Kubernetes(https://github.com/apache-spark-on-k8s/spark/tree/v2.2.0-kubernetes-0.5.0)
 code to run the Hive Query on Kerberos Enabled cluster. Spark-submit’s fail 
for the Hive Queries, but pass when I am trying to access the hdfs. Is this a 
known limitation or am I doing something wrong. Please let me know. If this is 
working, can you please specify an example for running Hive Queries?

Thanks.

Regards
Surya


Re: Spark on Mesos - Weird behavior

2018-07-23 Thread Thodoris Zois
Hi Susan,

This is exactly what we have used. Thank you for your interest!

- Thodoris 

> On 23 Jul 2018, at 20:55, Susan X. Huynh  wrote:
> 
> Hi Thodoris,
> 
> Maybe setting "spark.scheduler.minRegisteredResourcesRatio" to > 0 would 
> help? Default value is 0 with Mesos.
> 
> "The minimum ratio of registered resources (registered resources / total 
> expected resources) (resources are executors in yarn mode and Kubernetes 
> mode, CPU cores in standalone mode and Mesos coarsed-grained mode 
> ['spark.cores.max' value is total expected resources for Mesos coarse-grained 
> mode] ) to wait for before scheduling begins. Specified as a double between 
> 0.0 and 1.0. Regardless of whether the minimum ratio of resources has been 
> reached, the maximum amount of time it will wait before scheduling begins is 
> controlled by configspark.scheduler.maxRegisteredResourcesWaitingTime." - 
> https://spark.apache.org/docs/latest/configuration.html
> 
> Susan
> 
>> On Wed, Jul 11, 2018 at 7:22 AM, Pavel Plotnikov 
>>  wrote:
>> Oh, sorry, i missed that you use spark without dynamic allocation. Anyway, i 
>> don't know does this parameters works without dynamic allocation. 
>> 
>>> On Wed, Jul 11, 2018 at 5:11 PM Thodoris Zois  wrote:
>>> Hello,
>>> 
>>> Yeah you are right, but I think that works only if you use Spark dynamic 
>>> allocation. Am I wrong?
>>> 
>>> -Thodoris
>>> 
 On 11 Jul 2018, at 17:09, Pavel Plotnikov  
 wrote:
 
 Hi, Thodoris
 You can configure resources per executor and manipulate with number of 
 executers instead using spark.max.cores. I think 
 spark.dynamicAllocation.minExecutors and 
 spark.dynamicAllocation.maxExecutors configuration values can help you.
 
> On Tue, Jul 10, 2018 at 5:07 PM Thodoris Zois  wrote:
> Actually after some experiments we figured out that spark.max.cores / 
> spark.executor.cores is the upper bound for the executors. Spark apps 
> will run even only if one executor can be launched. 
> 
> Is there any way to specify also the lower bound? It is a bit annoying 
> that seems that we can’t control the resource usage of an application. By 
> the way, we are not using dynamic allocation. 
> 
> - Thodoris 
> 
> 
>> On 10 Jul 2018, at 14:35, Pavel Plotnikov 
>>  wrote:
>> 
>> Hello Thodoris!
>> Have you checked this:
>>  - does mesos cluster have available resources?
>>   - if spark have waiting tasks in queue more than 
>> spark.dynamicAllocation.schedulerBacklogTimeout configuration value?
>>  - And then, have you checked that mesos send offers to spark app mesos 
>> framework at least with 10 cores and 2GB RAM?
>> 
>> If mesos have not available offers with 10 cores, for example, but have 
>> with 8 or 9, so you can use smaller executers for better fit for 
>> available resources on nodes for example with 4 cores and 1 GB RAM, for 
>> example
>> 
>> Cheers,
>> Pavel
>> 
>>> On Mon, Jul 9, 2018 at 9:05 PM Thodoris Zois  wrote:
>>> Hello list,
>>> 
>>> We are running Apache Spark on a Mesos cluster and we face a weird 
>>> behavior of executors. When we submit an app with e.g 10 cores and 2GB 
>>> of memory and max cores 30, we expect to see 3 executors running on the 
>>> cluster. However, sometimes there are only 2... Spark applications are 
>>> not the only one that run on the cluster. I guess that Spark starts 
>>> executors on the available offers even if it does not satisfy our 
>>> needs. Is there any configuration that we can use in order to prevent 
>>> Spark from starting when there are no resource offers for the total 
>>> number of executors?
>>> 
>>> Thank you 
>>> - Thodoris 
>>> 
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>> 
>>> 
> 
> 
> 
> -- 
> Susan X. Huynh
> Software engineer, Data Agility
> xhu...@mesosphere.com


Re: Spark on Mesos: Spark issuing hundreds of SUBSCRIBE requests / second and crashing Mesos

2018-07-23 Thread Nimi W
That does sound like it could be it - I checked our libmesos version and it
is 1.4.1. I'll try upgrading libmesos.

Thanks.

On Mon, Jul 23, 2018 at 12:13 PM Susan X. Huynh 
wrote:

> Hi Nimi,
>
> This sounds similar to a bug I have come across before. See:
> https://jira.apache.org/jira/browse/SPARK-22342?focusedCommentId=16429950=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16429950
>
> It turned out to be a bug in libmesos (the client library used to
> communicate with Mesos): "using a failoverTimeout of 0 with Mesos native
> scheduler client can result in infinite subscribe loop" (
> https://issues.apache.org/jira/browse/MESOS-8171). It can be fixed by
> upgrading to a version of libmesos that has the fix.
>
> Susan
>
>
> On Fri, Jul 13, 2018 at 3:39 PM, Nimi W  wrote:
>
>> I've come across an issue with Mesos 1.4.1 and Spark 2.2.1. We launch
>> Spark tasks using the MesosClusterDispatcher in cluster mode. On a couple
>> of occasions, we have noticed that when the Spark Driver crashes (to
>> various causes - human error, network error), sometimes, when the Driver is
>> restarted, it issues a hundreds of SUBSCRIBE requests to mesos / per second
>> up until the Mesos Master node gets overwhelmed and crashes. It does this
>> again to the next master node, over and over until it takes down all the
>> master nodes. Usually the only thing that will fix is manually stopping the
>> driver and restarting.
>>
>> Here is a snippet of the log of the mesos master, which just logs the
>> repeated SUBSCRIBE command:
>> https://gist.github.com/nemosupremo/28ef4acfd7ec5bdcccee9789c021a97f
>>
>> Here is the output of the spark framework:
>> https://gist.github.com/nemosupremo/d098ef4def28ebf96c14d8f87aecd133 which
>> also just repeats 'Transport endpoint is not connected' over and over.
>>
>> Thanks for any insights
>>
>>
>>
>
>
> --
> Susan X. Huynh
> Software engineer, Data Agility
> xhu...@mesosphere.com
>


Re: Spark on Mesos: Spark issuing hundreds of SUBSCRIBE requests / second and crashing Mesos

2018-07-23 Thread Susan X. Huynh
Hi Nimi,

This sounds similar to a bug I have come across before. See:
https://jira.apache.org/jira/browse/SPARK-22342?focusedCommentId=16429950=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16429950

It turned out to be a bug in libmesos (the client library used to
communicate with Mesos): "using a failoverTimeout of 0 with Mesos native
scheduler client can result in infinite subscribe loop" (
https://issues.apache.org/jira/browse/MESOS-8171). It can be fixed by
upgrading to a version of libmesos that has the fix.

Susan


On Fri, Jul 13, 2018 at 3:39 PM, Nimi W  wrote:

> I've come across an issue with Mesos 1.4.1 and Spark 2.2.1. We launch
> Spark tasks using the MesosClusterDispatcher in cluster mode. On a couple
> of occasions, we have noticed that when the Spark Driver crashes (to
> various causes - human error, network error), sometimes, when the Driver is
> restarted, it issues a hundreds of SUBSCRIBE requests to mesos / per second
> up until the Mesos Master node gets overwhelmed and crashes. It does this
> again to the next master node, over and over until it takes down all the
> master nodes. Usually the only thing that will fix is manually stopping the
> driver and restarting.
>
> Here is a snippet of the log of the mesos master, which just logs the
> repeated SUBSCRIBE command: https://gist.github.com/nemosupremo/
> 28ef4acfd7ec5bdcccee9789c021a97f
>
> Here is the output of the spark framework: https://gist.
> github.com/nemosupremo/d098ef4def28ebf96c14d8f87aecd133 which also just
> repeats 'Transport endpoint is not connected' over and over.
>
> Thanks for any insights
>
>
>


-- 
Susan X. Huynh
Software engineer, Data Agility
xhu...@mesosphere.com


Re: Interest in adding ability to request GPU's to the spark client?

2018-07-23 Thread Susan X. Huynh
There's some discussion and proposal of supporting GPUs in this Spark JIRA:
https://jira.apache.org/jira/browse/SPARK-24615 "Accelerator-aware task
scheduling for Spark"

Susan

On Thu, Jul 12, 2018 at 11:17 AM, Mich Talebzadeh  wrote:

> I agree.
>
> Adding GPU capability to Spark in my opinion is a must for Advanced
> Analytics.
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 12 Jul 2018 at 19:14, Maximiliano Felice <
> maximilianofel...@gmail.com> wrote:
>
>> Hi,
>>
>> I've been meaning to reply to this email for a while now, sorry for
>> taking so much time.
>>
>> I personally think that adding GPU resource management will allow us to
>> boost some ETL performance a lot. For the last year, I've worked in
>> transforming some Machine Learning pipelines from Python in Numpy/Pandas to
>> Spark. Adding GPU capabilities to Spark would:
>>
>>
>>- Accelerate many matrix and batch computations we currently have in
>>Tensorflow
>>- Allow us to use spark for the whole pipeline (combined with
>>possibly better online serving)
>>- Let us trigger better Hyperparameter selection directly from Spark
>>
>>
>> There will be many more aspects of this that we could explode. What do
>> the rest of the list think?
>>
>> See you
>>
>> El mié., 16 may. 2018 a las 2:58, Daniel Galvez ()
>> escribió:
>>
>>> Hi all,
>>>
>>> Is anyone here interested in adding the ability to request GPUs to
>>> Spark's client (i.e, spark-submit)? As of now, Yarn 3.0's resource manager
>>> server has the ability to schedule GPUs as resources via cgroups, but the
>>> Spark client lacks an ability to request these.
>>>
>>> The ability to guarantee GPU resources would be practically useful for
>>> my organization. Right now, the only way to do that is to request the
>>> entire memory (or all CPU's) on a node, which is very kludgey and wastes
>>> resources, especially if your node has more than 1 GPU and your code was
>>> written such that an executor can use only one GPU at a time.
>>>
>>> I'm just not sure of a good way to make use of libraries like
>>> Databricks' Deep Learning pipelines
>>>  for GPU-heavy
>>> computation otherwise, unless you are luckily in an organization which is
>>> able to virtualize computer nodes such that each node will have only one
>>> GPU. Of course, I realize that many Databricks customers are using Azure or
>>> AWS, which allow you to do this facilely. Is this what people normally do
>>> in industry?
>>>
>>> This is something I am interested in working on, unless others out there
>>> have advice on why this is a bad idea.
>>>
>>> Unfortunately, I am not familiar enough with Mesos and Kubernetes right
>>> now to know how they schedule gpu resources and whether adding support for
>>> requesting GPU's from them to the spark-submit client would be simple.
>>>
>>> Daniel
>>>
>>> --
>>> Daniel Galvez
>>> http://danielgalvez.me
>>> https://github.com/galv
>>>
>>


-- 
Susan X. Huynh
Software engineer, Data Agility
xhu...@mesosphere.com


Re: Re: Re: spark sql data skew

2018-07-23 Thread Gourav Sengupta
https://docs.databricks.com/spark/latest/spark-sql/skew-join.html

The above might help, in case you are using a join.

On Mon, Jul 23, 2018 at 4:49 AM, 崔苗  wrote:

> but how to get count(distinct userId) group by company from count(distinct
> userId) group by company+x?
> count(userId) is different from count(distinct userId)
>
>
> 在 2018-07-21 00:49:58,Xiaomeng Wan  写道:
>
> try divide and conquer, create a column x for the fist character of
> userid, and group by company+x. if still too large, try first two character.
>
> On 17 July 2018 at 02:25, 崔苗  wrote:
>
>> 30G user data, how to get distinct users count after creating a composite
>> key based on company and userid?
>>
>>
>> 在 2018-07-13 18:24:52,Jean Georges Perrin  写道:
>>
>> Just thinking out loud… repartition by key? create a composite key based
>> on company and userid?
>>
>> How big is your dataset?
>>
>> On Jul 13, 2018, at 06:20, 崔苗  wrote:
>>
>> Hi,
>> when I want to count(distinct userId) by company,I met the data skew and
>> the task takes too long time,how to count distinct by keys on skew data in
>> spark sql ?
>>
>> thanks for any reply
>>
>>
>>
>>
>
>


Re: Spark on Mesos - Weird behavior

2018-07-23 Thread Susan X. Huynh
Hi Thodoris,

Maybe setting "spark.scheduler.minRegisteredResourcesRatio" to > 0 would
help? Default value is 0 with Mesos.

"The minimum ratio of registered resources (registered resources / total
expected resources) (resources are executors in yarn mode and Kubernetes
mode, CPU cores in standalone mode and Mesos coarsed-grained mode
['spark.cores.max' value is total expected resources for Mesos
coarse-grained mode] ) to wait for before scheduling begins. Specified as a
double between 0.0 and 1.0. Regardless of whether the minimum ratio of
resources has been reached, the maximum amount of time it will wait before
scheduling begins is controlled by config
spark.scheduler.maxRegisteredResourcesWaitingTime." -
https://spark.apache.org/docs/latest/configuration.html

Susan

On Wed, Jul 11, 2018 at 7:22 AM, Pavel Plotnikov <
pavel.plotni...@team.wrike.com> wrote:

> Oh, sorry, i missed that you use spark without dynamic allocation. Anyway,
> i don't know does this parameters works without dynamic allocation.
>
> On Wed, Jul 11, 2018 at 5:11 PM Thodoris Zois  wrote:
>
>> Hello,
>>
>> Yeah you are right, but I think that works only if you use Spark dynamic
>> allocation. Am I wrong?
>>
>> -Thodoris
>>
>> On 11 Jul 2018, at 17:09, Pavel Plotnikov 
>> wrote:
>>
>> Hi, Thodoris
>> You can configure resources per executor and manipulate with number of
>> executers instead using spark.max.cores. I think 
>> spark.dynamicAllocation.minExecutors
>> and spark.dynamicAllocation.maxExecutors configuration values can help
>> you.
>>
>> On Tue, Jul 10, 2018 at 5:07 PM Thodoris Zois  wrote:
>>
>>> Actually after some experiments we figured out that spark.max.cores /
>>> spark.executor.cores is the upper bound for the executors. Spark apps will
>>> run even only if one executor can be launched.
>>>
>>> Is there any way to specify also the lower bound? It is a bit annoying
>>> that seems that we can’t control the resource usage of an application. By
>>> the way, we are not using dynamic allocation.
>>>
>>> - Thodoris
>>>
>>>
>>> On 10 Jul 2018, at 14:35, Pavel Plotnikov >> com> wrote:
>>>
>>> Hello Thodoris!
>>> Have you checked this:
>>>  - does mesos cluster have available resources?
>>>   - if spark have waiting tasks in queue more than
>>> spark.dynamicAllocation.schedulerBacklogTimeout configuration value?
>>>  - And then, have you checked that mesos send offers to spark app mesos
>>> framework at least with 10 cores and 2GB RAM?
>>>
>>> If mesos have not available offers with 10 cores, for example, but have
>>> with 8 or 9, so you can use smaller executers for better fit for available
>>> resources on nodes for example with 4 cores and 1 GB RAM, for example
>>>
>>> Cheers,
>>> Pavel
>>>
>>> On Mon, Jul 9, 2018 at 9:05 PM Thodoris Zois  wrote:
>>>
 Hello list,

 We are running Apache Spark on a Mesos cluster and we face a weird
 behavior of executors. When we submit an app with e.g 10 cores and 2GB of
 memory and max cores 30, we expect to see 3 executors running on the
 cluster. However, sometimes there are only 2... Spark applications are not
 the only one that run on the cluster. I guess that Spark starts executors
 on the available offers even if it does not satisfy our needs. Is there any
 configuration that we can use in order to prevent Spark from starting when
 there are no resource offers for the total number of executors?

 Thank you
 - Thodoris

 -
 To unsubscribe e-mail: user-unsubscr...@spark.apache.org


>>


-- 
Susan X. Huynh
Software engineer, Data Agility
xhu...@mesosphere.com


Re: [Spark Structured Streaming on K8S]: Debug - File handles/descriptor (unix pipe) leaking

2018-07-23 Thread Abhishek Tripathi
Hello Dev!
Spark structured streaming job with simple window aggregation is leaking
file descriptor on kubernetes as cluster manager setup. It seems bug.
I am suing HDFS as FS for checkpointing.
Have anyone observed same?  Thanks for any help.

Please find more details in trailing email.


For more error log, please follow below Github gist:
https://gist.github.com/abhisheyke/1ecd952f2ae6af20cf737308a156f566
Some details about file descriptor (lsof):
https://gist.github.com/abhisheyke/27b073e7ac805ce9e6bb33c2b011bb5a
Code Snip:
https://gist.github.com/abhisheyke/6f838adf6651491bd4f263956f403c74

Thanks.

Best Regards,
*Abhishek Tripath*


On Thu, Jul 19, 2018 at 10:02 AM Abhishek Tripathi 
wrote:

> Hello All!​​
> I am using spark 2.3.1 on kubernetes to run a structured streaming spark
> job which read stream from Kafka , perform some window aggregation and
> output sink to Kafka.
> After job running few hours(5-6 hours), the executor pods is getting
> crashed which is caused by "Too many open files in system".
> Digging in further, with "lsof" command, I can see there is a lot UNIX
> pipe getting opened.
>
> # lsof -p 14 | tail
> java 14 root *112u  a_inode   0,100  8838
> [eventpoll]
> java 14 root *113r FIFO0,9  0t0 252556158 pipe
> java 14 root *114w FIFO0,9  0t0 252556158 pipe
> java 14 root *115u  a_inode   0,100  8838
> [eventpoll]
> java 14 root *119r FIFO0,9  0t0 252552868 pipe
> java 14 root *120w FIFO0,9  0t0 252552868 pipe
> java 14 root *121u  a_inode   0,100  8838
> [eventpoll]
> java 14 root *131r FIFO0,9  0t0 252561014 pipe
> java 14 root *132w FIFO0,9  0t0 252561014 pipe
> java 14 root *133u  a_inode   0,100  8838
> [eventpoll]
>
> Total count of open fd is going up to 85K (increased hard ulimit) for
> each pod and once it hit the hard limit , executor pod is getting crashed.
> For shuffling I can think of it need more fd but in my case open fd count
> keep growing forever. Not sure how can I estimate how many fd will be
> adequate or there is a bug.
> With that uncertainty, I increased hard ulimit to large number as 85k but
> no luck.
> Seems like there is file descriptor leak.
>
> This spark job is running with native support of kubernetes as spark
> cluster manager. Currently using only two executor with 20 core(request)
> and 10GB (+6GB as memoryOverhead) of physical memory each.
>
> Have any one else seen the similar problem ?
> Thanks for any suggestion.
>
>
> Error details:
> Caused by: java.io.FileNotFoundException:
> /tmp/blockmgr-b530983c-39be-4c6d-95aa-3ad12a507843/24/temp_shuffle_bf774cf5-fadb-4ca7-a27b-a5be7b835eb6
> (Too many open files in system)
> at java.io.FileOutputStream.open0(Native Method)
> at java.io.FileOutputStream.open(FileOutputStream.java:270)
> at java.io.FileOutputStream.(FileOutputStream.java:213)
> at
> org.apache.spark.storage.DiskBlockObjectWriter.initialize(DiskBlockObjectWriter.scala:103)
> at
> org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:116)
> at
> org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:237)
> at
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:151)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
> at org.apache.spark.scheduler.Task.run(Task.scala:109)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:748)
>
> For more error log, please follow below Github gist:
>
> https://gist.github.com/abhisheyke/1ecd952f2ae6af20cf737308a156f566
>
>
> Some details about file descriptor (lsof):
> https://gist.github.com/abhisheyke/27b073e7ac805ce9e6bb33c2b011bb5a
>
> Code Snip:
> https://gist.github.com/abhisheyke/6f838adf6651491bd4f263956f403c74
>
> Platform  Details:
> Kubernets Version : 1.9.2
> Docker : 17.3.2
> Spark version:  2.3.1
> Kafka version: 2.11-0.10.2.1 (both topic has 20 partition and getting
> almost 5k records/s )
> Hadoop version (Using hdfs for check pointing)  : 2.7.2
>
> Thank you for any help.
>
> Best Regards,
> *Abhishek Tripathi*
>
>


Re: [Structured Streaming] Avoiding multiple streaming queries

2018-07-23 Thread Silvio Fiorito
Using the current Kafka sink that supports routing based on topic column, you 
could just duplicate the rows (e.g. explode rows with different topic, key 
values). That way you’re only reading and processing the source once and not 
having to resort to custom sinks, foreachWriter, or multiple queries.

In Spark 2.4 there will be a foreachBatch method that will give you a DataFrame 
and let you write to the sink as you wish.

From: kant kodali 
Date: Monday, July 23, 2018 at 4:43 AM
To: Arun Mahadevan 
Cc: chandan prakash , Tathagata Das 
, "ymaha...@snappydata.io" 
, "priy...@asperasoft.com" , 
"user @spark" 
Subject: Re: [Structured Streaming] Avoiding multiple streaming queries

understand each row has a topic column but can we write one row to multiple 
topics?

On Thu, Jul 12, 2018 at 11:00 AM, Arun Mahadevan 
mailto:ar...@apache.org>> wrote:
What I meant was the number of partitions cannot be varied with ForeachWriter 
v/s if you were to write to each sink using independent queries. Maybe this is 
obvious.

I am not sure about the difference you highlight about the performance part. 
The commit happens once per micro batch and "close(null)" is invoked. You can 
batch your writes in the process and/or in the close. The guess the writes can 
still be atomic and decided by if “close” returns successfully or throws an 
exception.

Thanks,
Arun

From: chandan prakash 
mailto:chandanbaran...@gmail.com>>
Date: Thursday, July 12, 2018 at 10:37 AM
To: Arun Iyer mailto:ar...@apache.org>>
Cc: Tathagata Das 
mailto:tathagata.das1...@gmail.com>>, 
"ymaha...@snappydata.io" 
mailto:ymaha...@snappydata.io>>, 
"priy...@asperasoft.com" 
mailto:priy...@asperasoft.com>>, "user @spark" 
mailto:user@spark.apache.org>>

Subject: Re: [Structured Streaming] Avoiding multiple streaming queries

Thanks a lot Arun for your response.
I got your point that existing sink plugins like kafka, etc can not be used.
However I could not get the part : " you cannot scale the partitions for the 
sinks independently "
Can you please rephrase the above part ?

Also,
I guess :
using foreachwriter for multiple sinks will affect the performance because 
write will happen to a sink per record basis (after deciding a record belongs 
to which particular sink), where as in the current implementation all data 
under a RDD partition gets committed to the sink atomically in one go. Please 
correct me if I am wrong here.



Regards,
Chandan

On Thu, Jul 12, 2018 at 10:53 PM Arun Mahadevan 
mailto:ar...@apache.org>> wrote:
Yes ForeachWriter [1] could be an option If you want to write to different 
sinks. You can put your custom logic to split the data into different sinks.

The drawback here is that you cannot plugin existing sinks like Kafka and you 
need to write the custom logic yourself and you cannot scale the partitions for 
the sinks independently.

[1] 
https://spark.apache.org/docs/2.1.2/api/java/org/apache/spark/sql/ForeachWriter.html

From: chandan prakash 
mailto:chandanbaran...@gmail.com>>
Date: Thursday, July 12, 2018 at 2:38 AM
To: Tathagata Das 
mailto:tathagata.das1...@gmail.com>>, 
"ymaha...@snappydata.io" 
mailto:ymaha...@snappydata.io>>, 
"priy...@asperasoft.com" 
mailto:priy...@asperasoft.com>>, "user @spark" 
mailto:user@spark.apache.org>>
Subject: Re: [Structured Streaming] Avoiding multiple streaming queries

Hi,
Did anyone of you thought  about writing a custom foreach sink writer which can 
decided which record should go to which sink (based on some marker in record, 
which we can possibly annotate during transformation) and then accordingly 
write to specific sink.
This will mean that:
1. every custom sink writer will have connections to as many sinks as many 
there are types of sink where records can go.
2.  every record will be read once in the single query but can be written to 
multiple sinks

Do you guys see any drawback in this approach ?
One drawback off course there is that sink is supposed to write the records as 
they are but we are inducing some intelligence here in the sink.
Apart from that any other issues do you see with this approach?

Regards,
Chandan


On Thu, Feb 15, 2018 at 7:41 AM Tathagata Das 
mailto:tathagata.das1...@gmail.com>> wrote:
Of course, you can write to multiple Kafka topics from a single query. If your 
dataframe that you want to write has a column named "topic" (along with "key", 
and "value" columns), it will write the contents of a row to the topic in that 
row. This automatically works. So the only thing you need to figure out is how 
to generate the value of that column.

This is documented - 
https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#writing-data-to-kafka

Or am i misunderstanding the problem?

TD




On Tue, Feb 13, 2018 at 10:45 AM, Yogesh Mahajan 
mailto:ymaha...@snappydata.io>> wrote:
I had a similar issue and i think 

Apache Spark Cluster

2018-07-23 Thread Uğur Sopaoğlu
We try to create a cluster which consists of 4 machines. The cluster will
be used by multiple-users. How can we configured that user can submit jobs
from personal computer and is there any free tool you can suggest to
leverage procedure.
-- 
Uğur Sopaoğlu


Re: [Structured Streaming] Avoiding multiple streaming queries

2018-07-23 Thread kant kodali
understand each row has a topic column but can we write one row to multiple
topics?

On Thu, Jul 12, 2018 at 11:00 AM, Arun Mahadevan  wrote:

> What I meant was the number of partitions cannot be varied with
> ForeachWriter v/s if you were to write to each sink using independent
> queries. Maybe this is obvious.
>
> I am not sure about the difference you highlight about the performance
> part. The commit happens once per micro batch and "close(null)" is invoked.
> You can batch your writes in the process and/or in the close. The guess the
> writes can still be atomic and decided by if “close” returns successfully
> or throws an exception.
>
> Thanks,
> Arun
>
> From: chandan prakash 
> Date: Thursday, July 12, 2018 at 10:37 AM
> To: Arun Iyer 
> Cc: Tathagata Das , "ymaha...@snappydata.io"
> , "priy...@asperasoft.com" ,
> "user @spark" 
>
> Subject: Re: [Structured Streaming] Avoiding multiple streaming queries
>
> Thanks a lot Arun for your response.
> I got your point that existing sink plugins like kafka, etc can not be
> used.
> However I could not get the part : " you cannot scale the partitions for
> the sinks independently "
> Can you please rephrase the above part ?
>
> Also,
> I guess :
> using foreachwriter for multiple sinks will affect the performance because
> write will happen to a sink per record basis (after deciding a record
> belongs to which particular sink), where as in the current implementation
> all data under a RDD partition gets committed to the sink atomically in one
> go. Please correct me if I am wrong here.
>
>
>
> Regards,
> Chandan
>
> On Thu, Jul 12, 2018 at 10:53 PM Arun Mahadevan  wrote:
>
>> Yes ForeachWriter [1] could be an option If you want to write to
>> different sinks. You can put your custom logic to split the data into
>> different sinks.
>>
>> The drawback here is that you cannot plugin existing sinks like Kafka and
>> you need to write the custom logic yourself and you cannot scale the
>> partitions for the sinks independently.
>>
>> [1] https://spark.apache.org/docs/2.1.2/api/java/org/apache/spark/sql/
>> ForeachWriter.html
>>
>> From: chandan prakash 
>> Date: Thursday, July 12, 2018 at 2:38 AM
>> To: Tathagata Das , "ymaha...@snappydata.io"
>> , "priy...@asperasoft.com" <
>> priy...@asperasoft.com>, "user @spark" 
>> Subject: Re: [Structured Streaming] Avoiding multiple streaming queries
>>
>> Hi,
>> Did anyone of you thought  about writing a custom foreach sink writer
>> which can decided which record should go to which sink (based on some
>> marker in record, which we can possibly annotate during transformation) and
>> then accordingly write to specific sink.
>> This will mean that:
>> 1. every custom sink writer will have connections to as many sinks as
>> many there are types of sink where records can go.
>> 2.  every record will be read once in the single query but can be written
>> to multiple sinks
>>
>> Do you guys see any drawback in this approach ?
>> One drawback off course there is that sink is supposed to write the
>> records as they are but we are inducing some intelligence here in the sink.
>> Apart from that any other issues do you see with this approach?
>>
>> Regards,
>> Chandan
>>
>>
>> On Thu, Feb 15, 2018 at 7:41 AM Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> Of course, you can write to multiple Kafka topics from a single query.
>>> If your dataframe that you want to write has a column named "topic" (along
>>> with "key", and "value" columns), it will write the contents of a row to
>>> the topic in that row. This automatically works. So the only thing you need
>>> to figure out is how to generate the value of that column.
>>>
>>> This is documented - https://spark.apache.org/docs/latest/structured-
>>> streaming-kafka-integration.html#writing-data-to-kafka
>>>
>>> Or am i misunderstanding the problem?
>>>
>>> TD
>>>
>>>
>>>
>>>
>>> On Tue, Feb 13, 2018 at 10:45 AM, Yogesh Mahajan >> > wrote:
>>>
 I had a similar issue and i think that’s where the structured streaming
 design lacks.
 Seems like Question#2 in your email is a viable workaround for you.

 In my case, I have a custom Sink backed by an efficient in-memory
 column store suited for fast ingestion.

 I have a Kafka stream coming from one topic, and I need to classify the
 stream based on schema.
 For example, a Kafka topic can have three different types of schema
 messages and I would like to ingest into the three different column
 tables(having different schema) using my custom Sink implementation.

 Right now only(?) option I have is to create three streaming queries
 reading the same topic and ingesting to respective column tables using
 their Sink implementations.
 These three streaming queries create underlying three
 IncrementalExecutions and three KafkaSources, and three queries reading the
 same data from the same Kafka topic.
 Even with CachedKafkaConsumers at partition