Re: JavaSerializerInstance is slow

2021-09-02 Thread Antonin Delpeuch (lists)
Hi Kohki,

Serialization of tasks happens in local mode too and as far as I am
aware there is no way to disable this (although it would definitely be
useful in my opinion).

You can see the local mode as a testing mode, in which you would want to
catch any serialization errors, before they appear in production.

There are also some important bugs that are present in local mode and
are not deemed worth fixing because it is not intended to be used in
production (https://issues.apache.org/jira/browse/SPARK-5300).

I think there would definitely be interest in having a reliable and
efficient local mode in Spark but it's a pretty different use case than
what Spark originally focused on.

Antonin

On 03/09/2021 05:56, Kohki Nishio wrote:
> I'm seeing many threads doing deserialization of a task, I understand
> since lambda is involved, we can't use Kryo for those purposes.
> However I'm running it in local mode, this serialization is not really
> necessary, no?
>
> Is there any trick I can apply to get rid of this thread contention ?
> I'm seeing many of the below threads in thread dumps ... 
>
>
> "Executor task launch worker for task 11.0 in stage 15472514.0 (TID
> 19788863)" #732821 daemon prio=5 os_prio=0 tid=0x7f02581b2800
> nid=0x355d waiting for monitor entry [0x7effd1e3f000]
>    java.lang.Thread.State: BLOCKED (on object monitor)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:400)
> - waiting to lock <0x7f0f7246edf8> (a java.lang.Object)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
> at
> scala.runtime.LambdaDeserializer$.deserializeLambda(LambdaDeserializer.scala:51)
> at
> scala.runtime.LambdaDeserialize.deserializeLambda(LambdaDeserialize.java:38) 
>
>
> Thanks
> -Kohki

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



type mismatch

2021-09-02 Thread igyu
val schemas = createSchemas(config)
val arr = new Array[String](schemas.size())

lines.map(x => {
  val obj = JSON.parseObject(x)
  val vs = new Array[Any](schemas.size())
  for (i <- 0 until schemas.size()) {
arr(i) = schemas.get(i).name
 vs(i) = x.getString(schemas.get(i).name)
}
  }

  val seq = Seq(vs: _*)
  val record = Row.fromSeq(seq)
  record
})(Encoders.javaSerialization(Row.getClass))
  .toDF(arr: _*)

I get a error

type mismatch;
 found   : Class[?0] where type ?0 <: org.apache.spark.sql.Row.type
 required: Class[org.apache.spark.sql.Row]
})(Encoders.javaSerialization(Row.getClass))


igyu


JavaSerializerInstance is slow

2021-09-02 Thread Kohki Nishio
I'm seeing many threads doing deserialization of a task, I understand since
lambda is involved, we can't use Kryo for those purposes. However I'm
running it in local mode, this serialization is not really necessary, no?

Is there any trick I can apply to get rid of this thread contention ? I'm
seeing many of the below threads in thread dumps ...


"Executor task launch worker for task 11.0 in stage 15472514.0 (TID
19788863)" #732821 daemon prio=5 os_prio=0 tid=0x7f02581b2800
nid=0x355d waiting for monitor entry [0x7effd1e3f000]
   java.lang.Thread.State: BLOCKED (on object monitor)
at java.lang.ClassLoader.loadClass(ClassLoader.java:400)
- waiting to lock <0x7f0f7246edf8> (a java.lang.Object)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at
scala.runtime.LambdaDeserializer$.deserializeLambda(LambdaDeserializer.scala:51)
at
scala.runtime.LambdaDeserialize.deserializeLambda(LambdaDeserialize.java:38)


Thanks
-Kohki


Re: Appending a static dataframe to a stream create Parquet file fails

2021-09-02 Thread Jungtaek Lim
Hi,

The file stream sink maintains the metadata in the output directory. The
metadata retains the list of files written by the streaming query, and
Spark reads the metadata on listing the files to read.

This is to guarantee end-to-end exactly once on writing files in the
streaming query. There could be failure on the streaming query and some
files may be partially written. Metadata will help to skip reading these
files and only read files which are correctly written.

This leads to a major restriction, you can't write the output directory
from multiple queries. For your case, Spark will only read the files which
are written from the streaming query.

There are 3rd party projects dealing with transactional write from multiple
writes, (alphabetically) Apache Iceberg, Delta Lake, and so on. You may
want to check them out.

Thanks,
Jungtaek Lim (HeartSaVioR)

On Thu, Sep 2, 2021 at 10:04 PM  wrote:

> Hi all,
>   I recently stumbled about a rather strange  problem with streaming
> sources in one of my tests. I am writing a Parquet file from a streaming
> source and subsequently try to append the same data but this time from a
> static dataframe. Surprisingly, the number of rows in the Parquet file
> remains the same after the append operation.
> Here is the relevant code
>
>   "Appending data from static dataframe" must "produce twice as much data" in 
> {
>
> logLinesStream.writeStream
>
>   .format("parquet")
>
>   .option("path", path.toString)
>
>   .outputMode("append")
>
>   .start()
>
>   .processAllAvailable()
>
> spark.read.format("parquet").load(path.toString).count mustBe 1159
>
>
> logLinesDF.write.format("parquet").mode("append").save(path.toString)
>
> spark.read.format("parquet").load(path.toString).count mustBe 2*1159
>
>   }
>
>
> Does anyone have an idea what I am doing wrong here?
>
> thanks in advance
>  Eugen Wintersberger
>


Unsubscribe

2021-09-02 Thread 周翔
Unsubscribe


Re: Can’t write to PVC in K8S

2021-09-02 Thread Bjørn Jørgensen
Well, I have tried almost everything the last 2 days now. 

There is no user spark, and whatever I do with the executor image it only runs 
for 2 minutes in k8s and then restarts. 


The problem seems to be the nogroup that is writing files from executors. 
drwxr-xr-x  2185 nogroup4096 Sep  2 18:43 test14


So is there anything that I can do with that? Or should I move on to minio or 
something else? 
I need to ETL 500 K - 94 GB of json files and save them somewhere. 

On 2021/08/31 21:09:25, Mich Talebzadeh  wrote: 
> I think Holden alluded to that.
> 
> In a nutshell, users in Linux can belong to more than one group. In this
> case you want to create a new group newgroup and add two users to that
> group.Do this in the docker file as USER 0
> 
> RUN groupadd newgroup
> ## Now add the two users (these users need to exist)
> RUN usermod -a -G newgroup jovyan
> RUN usermod -a -G newgroup spark
> ## set permission on the directory
> RUN chgrp -R newgroup /path/to/the/directory
> RUN chmod -R 770 /path/to/the/directory
> 
> Check this thread as well
> 
> https://superuser.com/questions/280994/give-write-permissions-to-multiple-users-on-a-folder-in-ubuntu
> 
> HTH
> 
> 
> 
>view my Linkedin profile
> 
> 
> 
> 
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
> 
> 
> 
> 
> On Tue, 31 Aug 2021 at 20:50, Holden Karau  wrote:
> 
> > You can change the UID of one of them to match, or you could add them both
> > to a group and set permissions to 770.
> >
> > On Tue, Aug 31, 2021 at 12:18 PM Bjørn Jørgensen 
> > wrote:
> >
> >> Hi and thanks for all the good help.
> >>
> >> I will build jupyter on top of spark to be able to run jupyter in local
> >> mode with the new koalas library. The new koalas library can be imported as
> >> "from pyspark import pandas as ps".
> >>
> >> Then you can run spark on K8S the same way that you use pandas in a
> >> notebook.
> >>
> >> The easiest way to get a PV in K8S is with NFS. And with NFS you will
> >> find your files outside K8S without having to copy files out of a K8S PVC.
> >>
> >> With this setup I can use pandas code in a notebook with the power from a
> >> K8S cluster, as a normal notebook with pandas code.
> >> I hope that this project will be a easy way to convert from pandas to
> >> spark on K8S.
> >>
> >>
> >> I did some testing to day with file permission. Like  RUN mkdir -p
> >> /home/files and RUN chmod g+w /home/files
> >> But
> >>
> >> 185@myapp-38a8887b9cedae97-exec-1:~/work-dir$ id
> >> uid=185(185) gid=0(root) groups=0(root)
> >>
> >>
> >> jovyan@my-pyspark-notebook-f6d497958-t9rpk:~$ id
> >> uid=1000(jovyan) gid=100(users) groups=100(users)
> >>
> >> so it did't work.
> >>
> >> What will be the best way to make jovyan and 185 write to the same
> >> folder?
> >> On 2021/08/30 23:00:40, Mich Talebzadeh 
> >> wrote:
> >> > To be specific uid=185 (spark user, AKA anonymous) and root are in the
> >> same
> >> > group in the docker image itself
> >> >
> >> >
> >> > id
> >> >
> >> > uid=185(185) gid=0(root) groups=0(root)
> >> >
> >> >
> >> > So in the docker image conf file, you can create your permanent
> >> directory
> >> > as root off /home say
> >> >
> >> > do it as root (USER 0)
> >> >
> >> >
> >> > RUN mkdir -p /home/
> >> >
> >> > RUN chmod g+w /home/  ## give write permission to spark
> >> >
> >> >
> >> > ARG spark_uid=185
> >> > ..
> >> >
> >> > # Specify the User that the actual main process will run as
> >> >
> >> > USER ${spark_uid}
> >> >
> >> >
> >> >view my Linkedin profile
> >> > 
> >> >
> >> >
> >> >
> >> > *Disclaimer:* Use it at your own risk. Any and all responsibility for
> >> any
> >> > loss, damage or destruction of data or any other property which may
> >> arise
> >> > from relying on this email's technical content is explicitly disclaimed.
> >> > The author will in no case be liable for any monetary damages arising
> >> from
> >> > such loss, damage or destruction.
> >> >
> >> >
> >> >
> >> >
> >> > On Mon, 30 Aug 2021 at 22:26, Mich Talebzadeh <
> >> mich.talebza...@gmail.com>
> >> > wrote:
> >> >
> >> > > Forgot to mention that Spark uses that work directory to unzip the
> >> zipped
> >> > > files or gunzip archive files
> >> > >
> >> > > For example
> >> > >
> >> > > pyFiles
> >>  gs://axial-glow-224522-spark-on-k8s/codes/DSBQ.zip
> >> > >
> >> > >
> >> > > Spark will use that $SPARK_HOME/work-dir to unzip DSBQ.zip which is
> >> the
> >> > > application package here
> >> > >
> >> > >
> >> > > The alternative is to hack the docker file to create a directory for
> >> > > yourself
> >> > >
> >> > >
> >> > > RUN 

Re: Get application metric from Spark job

2021-09-02 Thread Haryani, Akshay
Hi Aurélien,

Spark has endpoints to expose the spark application metrics. These endpoints 
can be used as a rest API. You can read more about these here: 
https://spark.apache.org/docs/3.1.1/monitoring.html#rest-api

Additionally,
If you want to build your own custom metrics, you can explore spark custom 
plugins. Using a custom plugin, you can track your own custom metrics and plug 
it into the spark metrics system. Please note plugins are supported on spark 
versions above 3.0.


--
Thanks & Regards,
Akshay Haryani

From: Aurélien Mazoyer 
Date: Thursday, September 2, 2021 at 8:36 AM
To: user@spark.apache.org 
Subject: Get application metric from Spark job
Hi community,

I would like to collect information about the execution of a Spark job while it 
is running. Could I define some kind of application metrics (such as a counter 
that would be incremented in my code) that I could retrieve regularly while the 
job is running?

Thank you for help,

Aurelien


Reading CSV and Transforming to Parquet Issue

2021-09-02 Thread ☼ R Nair
All,

This is very surprising and I am sure I might be doing something wrong. The
issue is, the following code is taking 8 hours. It reads a CSV file, takes
the phone number column, extracts the first four digits and then
partitions based on the four digits (phoneseries) and writes to Parquet.
Any clue on why ? The CSV file is just one million rows only. Thanks in
advance. Spark version is 3.0.1

*val df1 = spark.read.format("csv").option("header",
"true").load("file:///sparkcode/myjobs/csvs/*.csv")*

*//Above CSV contains a column names phonenumber which has a very side
range of values, total number //of rows in CSV is just below One million.
Also, test was done for just one file*


*val df =
df1.withColumn("phoneseries",df1("phonenumbercolumn").substr(1,5))*


*df.printSchema()  //Schema is printed correctly*


*df.write.option("header","true").partitionBy("phoneseries").mode("append").parquet("file:///sparkcode/mydest/parquet")*

Best,
Ravi


Get application metric from Spark job

2021-09-02 Thread Aurélien Mazoyer
Hi community,

I would like to collect information about the execution of a Spark job
while it is running. Could I define some kind of application metrics (such
as a counter that would be incremented in my code) that I could retrieve
regularly while the job is running?

Thank you for help,

Aurelien


Appending a static dataframe to a stream create Parquet file fails

2021-09-02 Thread eugen . wintersberger
Hi all,
  I recently stumbled about a rather strange  problem with streaming
sources in one of my tests. I am writing a Parquet file from a
streaming source and subsequently try to append the same data but this
time from a static dataframe. Surprisingly, the number of rows in the
Parquet file remains the same after the append operation. 
Here is the relevant code

  "Appending data from static dataframe" must "produce twice as much data" in {
logLinesStream.writeStream
  .format("parquet")
  .option("path", path.toString)
  .outputMode("append")
  .start()
  .processAllAvailable()
spark.read.format("parquet").load(path.toString).count mustBe 1159

logLinesDF.write.format("parquet").mode("append").save(path.toString)
spark.read.format("parquet").load(path.toString).count mustBe 2*1159
  }

Does anyone have an idea what I am doing wrong here?

thanks in advance
 Eugen Wintersberger


Re: Connection reset by peer : failed to remove cache rdd

2021-09-02 Thread Harsh Sharma



On 2021/09/02 06:00:26, Harsh Sharma  wrote: 
> Please Find reply : 
> Do you know when in your application lifecycle it happens? Spark SQL or
> > Structured Streaming? 
> 
> ans :its Spark SQL
> 
> Do you use broadcast variables ?
> 
> ans : yes we are using broadcast variables
>  or are the errors
>  coming from broadcast joins perhaps? 
not sure about this

> 
> On 2021/08/30 13:32:19, Jacek Laskowski  wrote: 
> > Hi,
> > 
> > No idea what might be going on here, but I'd not worry much about it and
> > simply monitor disk usage as some broadcast blocks might have left over.
> > 
> > Do you know when in your application lifecycle it happens? Spark SQL or
> > Structured Streaming? Do you use broadcast variables or are the errors
> > coming from broadcast joins perhaps?
> > 
> > Pozdrawiam,
> > Jacek Laskowski
> > 
> > https://about.me/JacekLaskowski
> > "The Internals Of" Online Books 
> > Follow me on https://twitter.com/jaceklaskowski
> > 
> > 
> > 
> > 
> > On Mon, Aug 30, 2021 at 3:26 PM Harsh Sharma 
> > wrote:
> > 
> > > We are facing issue in production where we are getting frequent
> > >
> > > Still have 1 request outstanding when connection with the hostname was
> > > closed
> > >
> > > connection reset by peer : errors as well as warnings  : failed to remove
> > > cache rdd or failed  to remove broadcast variable.
> > >
> > > Please help us how to mitigate this  :
> > >
> > > Executor memory : 12g
> > >
> > > Network timeout :   60
> > >
> > > Heartbeat interval : 25
> > >
> > >
> > >
> > > [Stage 284:>(199 + 1) / 200][Stage 292:>  (1 + 3)
> > > / 200]
> > > [Stage 284:>(199 + 1) / 200][Stage 292:>  (2 + 3)
> > > / 200]
> > > [Stage 292:>  (2 + 4)
> > > / 200][14/06/21 10:46:17,006 WARN
> > > shuffle-server-4](TransportChannelHandler) Exception in connection from
> > > 
> > > java.io.IOException: Connection reset by peer
> > > at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
> > > at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> > > at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
> > > at sun.nio.ch.IOUtil.read(IOUtil.java:192)
> > > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:378)
> > > at
> > > io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:313)
> > > at
> > > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
> > > at
> > > io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:242)
> > > at
> > > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
> > > at
> > > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
> > > at
> > > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> > > at
> > > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> > > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> > > at
> > > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
> > > at java.lang.Thread.run(Thread.java:748)
> > > [14/06/21 10:46:17,010 ERROR shuffle-server-4](TransportResponseHandler)
> > > Still have 1 requests outstanding when connection from  is 
> > > closed
> > > [14/06/21 10:46:17,012 ERROR Spark Context Cleaner](ContextCleaner) Error
> > > cleaning broadcast 159
> > > java.io.IOException: Connection reset by peer
> > > at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
> > > at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> > > at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
> > > at sun.nio.ch.IOUtil.read(IOUtil.java:192)
> > > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:378)
> > > at
> > > io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:313)
> > > at
> > > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
> > > at
> > > io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:242)
> > > at
> > > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
> > > at
> > > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
> > > at
> > > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> > > at
> > > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> > > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> > > at
> > > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
> > > at 

Unsubscribe

2021-09-02 Thread 孙乾(亨贞)
Unsubscribe



Spark Phoenix Connection Exception while loading from Phoenix tables

2021-09-02 Thread Harsh Sharma
[01/09/21 11:55:51,861 WARN  pool-1-thread-1](Client) Exception encountered 
while connecting to the server : java.lang.NullPointerException 
[01/09/21 11:55:51,862 WARN  pool-1-thread-1](Client) Exception encountered 
while connecting to the server : java.lang.NullPointerException 

[01/09/21 11:55:51,862 WARN  pool-1-thread-1](RetryInvocationHandler) Exception 
while invoking class 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo
 over server 
Not retrying because failovers (15) exceeded maximum allowed (15) 
java.io.IOException: Failed on local exception: java.io.IOException: 
java.lang.NullPointerException; Host Details : local host is:xx1 
destination host is: xx2
at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:776)
at org.apache.hadoop.ipc.Client.call(Client.java:1479)
at org.apache.hadoop.ipc.Client.call(Client.java:1412)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
at com.sun.proxy.$Proxy18.getFileInfo(Unknown Source)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771)
at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy19.getFileInfo(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2108)
at 
org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Connection reset by peer : failed to remove cache rdd

2021-09-02 Thread Harsh Sharma
Please Find reply : 
Do you know when in your application lifecycle it happens? Spark SQL or
> Structured Streaming? 

ans :its Spark SQL

Do you use broadcast variables ?

ans : yes we are using broadcast variables
 or are the errors
 coming from broadcast joins perhaps? 
ans :we are not using Boardcast join

On 2021/08/30 13:32:19, Jacek Laskowski  wrote: 
> Hi,
> 
> No idea what might be going on here, but I'd not worry much about it and
> simply monitor disk usage as some broadcast blocks might have left over.
> 
> Do you know when in your application lifecycle it happens? Spark SQL or
> Structured Streaming? Do you use broadcast variables or are the errors
> coming from broadcast joins perhaps?
> 
> Pozdrawiam,
> Jacek Laskowski
> 
> https://about.me/JacekLaskowski
> "The Internals Of" Online Books 
> Follow me on https://twitter.com/jaceklaskowski
> 
> 
> 
> 
> On Mon, Aug 30, 2021 at 3:26 PM Harsh Sharma 
> wrote:
> 
> > We are facing issue in production where we are getting frequent
> >
> > Still have 1 request outstanding when connection with the hostname was
> > closed
> >
> > connection reset by peer : errors as well as warnings  : failed to remove
> > cache rdd or failed  to remove broadcast variable.
> >
> > Please help us how to mitigate this  :
> >
> > Executor memory : 12g
> >
> > Network timeout :   60
> >
> > Heartbeat interval : 25
> >
> >
> >
> > [Stage 284:>(199 + 1) / 200][Stage 292:>  (1 + 3)
> > / 200]
> > [Stage 284:>(199 + 1) / 200][Stage 292:>  (2 + 3)
> > / 200]
> > [Stage 292:>  (2 + 4)
> > / 200][14/06/21 10:46:17,006 WARN
> > shuffle-server-4](TransportChannelHandler) Exception in connection from
> > 
> > java.io.IOException: Connection reset by peer
> > at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
> > at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> > at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
> > at sun.nio.ch.IOUtil.read(IOUtil.java:192)
> > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:378)
> > at
> > io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:313)
> > at
> > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
> > at
> > io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:242)
> > at
> > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
> > at
> > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
> > at
> > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> > at
> > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> > at
> > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
> > at java.lang.Thread.run(Thread.java:748)
> > [14/06/21 10:46:17,010 ERROR shuffle-server-4](TransportResponseHandler)
> > Still have 1 requests outstanding when connection from  is closed
> > [14/06/21 10:46:17,012 ERROR Spark Context Cleaner](ContextCleaner) Error
> > cleaning broadcast 159
> > java.io.IOException: Connection reset by peer
> > at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
> > at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> > at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
> > at sun.nio.ch.IOUtil.read(IOUtil.java:192)
> > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:378)
> > at
> > io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:313)
> > at
> > io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
> > at
> > io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:242)
> > at
> > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
> > at
> > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
> > at
> > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> > at
> > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> > at
> > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
> > at java.lang.Thread.run(Thread.java:748)
> > [14/06/21 10:46:17,012 WARN
> > block-manager-ask-thread-pool-69](BlockManagerMaster) Failed to remove
> > broadcast 159 with removeFromMaster = true - Connection reset by peer
> > java.io.IOException: Connection reset by