unsubscribe

2017-11-21 Thread 韩盼
unsubscribe

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Caching dataframes and overwrite

2017-11-21 Thread Michael Artz
I have been interested in finding out why I am getting strange behavior
when running a certain spark job. The job will error out if I place an
action (A .show(1) method) either right after caching the DataFrame or
right before writing the dataframe back to hdfs. There is a very similar
post to Stackoverflow post here... Spark SQL SaveMode.Overwrite, getting
java.io.FileNotFoundException and requiring 'REFRESH TABLE tableName'

.

Basically the other post explains, that when you read from the same hdfs
directory that you are writing to, and your SaveMode is "overwrite", then
you will get a java.io.FileNotFoundException. But here I am finding that
just moving where in the program the action is can give very different
results - either completing the program or giving this exception. I was
wondering if anyone can explain why Spark is not being consistent here?

 val myDF = spark.read.format("csv")
.option("header", "false")
.option("delimiter", "\t")
.schema(schema)
.load(myPath)

// If I cache it here or persist it then do an action after the cache,
it will occasionally
// not throw the error. This is when completely restarting the
SparkSession so there is no
// risk of another user interfering on the same JVM.
  myDF.cache()
  myDF.show(1)
// Below is just meant to be showing that we're are doing other "spark
dataframe transformations",
// but different transformations have both led to the weird behavior
so, I'm not being specific about
// what exactly the dataframe transformations are

val secondDF = mergeOtherDFsWithmyDF(myDF, otherDF, thirdDF)

val fourthDF = mergeTwoDFs(thirdDF, StringToCheck, fifthDF)

// Below is the same .show(1) action call as was previously done, only
this below
// action ALWAYS results in a successful completion and the above
.show(1) sometimes results
// in FileNotFoundException and sometimes results in successful
completion. The only
// thing that changes among test runs is only one is executed. Either
// **fourthDF.show(1) or myDF.show(1) is left commented out**
fourthDF.show(1)
fourthDF.write
.mode(writeMode)
.option("header", "false")
.option("delimiter", "\t")
.csv(myPath)


Re: What do you pay attention to when validating Spark jobs?

2017-11-21 Thread lucas.g...@gmail.com
I don't think these will blow anyones minds but:

1) Row counts.  Most of our jobs 'recompute the world' nightly so we can
expect to see fairly predictable row variances.
2) Rolling snapshots.  We can also expect that for some critical datasets
we can compute a rolling average for important metrics (revenue, user
count, etc).  We're just starting to investigate this.
3) Job timing:  Jobs should normally take about the same amount of time to
execute (usually).  So we want to alert on things that finish too quickly
(no data in the pipe) or things that take too long.

I'd like to get further into anomaly detection but haven't gotten there yet.

On 21 November 2017 at 15:34, Holden Karau  wrote:

> Hi Folks,
>
> I'm working on updating a talk and I was wondering if any folks in the
> community wanted to share their best practices for validating your Spark
> jobs? Are there any counters folks have found useful for
> monitoring/validating your Spark jobs?
>
> Cheers,
>
> Holden :)
>
> --
> Twitter: https://twitter.com/holdenkarau
>


What do you pay attention to when validating Spark jobs?

2017-11-21 Thread Holden Karau
Hi Folks,

I'm working on updating a talk and I was wondering if any folks in the
community wanted to share their best practices for validating your Spark
jobs? Are there any counters folks have found useful for
monitoring/validating your Spark jobs?

Cheers,

Holden :)

-- 
Twitter: https://twitter.com/holdenkarau


Re: Writing files to s3 with out temporary directory

2017-11-21 Thread Jim Carroll
I got it working. It's much faster.

If someone else wants to try it I:
1) Was already using the code from the Presto S3 Hadoop FileSystem
implementation modified to sever it from the rest of the Presto codebase.
2) I extended it and overrode the method "keyFromPath" so that anytime the
Path referred to a "_temporary" parquet file "part" it returned a "key" to
the final location of the file.
3) I registered the filesystem through sparkContext.hadoopConfiguration by
setting fs.s3.impl, fs.s3n.impl, and fs.s3a.impl.

I realize I'm risking a file corruption but it's WY faster than it was.




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Custom Data Source for getting data from Rest based services

2017-11-21 Thread Sourav Mazumder
Hi All,

Need your thoughts/inputs on a custom Data Source for accessing Rest based
services in parallel using Spark.

Many a times for business applications (batch oriented) one has to call a
target Rest service for a high number of times (with different set of
values of parameters/KV pairs).

The example use cases for the same are -

- Getting results/prediction from Machine Learning/NLP systems,
- Accessing utility APIs (like address validation) in bulk for 1000s of
inputs
- Ingesting data from systems who support only parametric data query (say
for time series data),
- Indexing data to Search systems
- Web crawling
- Accessing business applications which do not support bulk download
- others 

Typically, for these use cases, the number of time the Service is called
(with various parameters/data) can be high. So people use/develop a
parallel processing framework (specific to his/her choice of language) to
call the APIs in parallel. But typically it is hard to make such thing run
in a distributed manner using multiple machines.

I found Spark's distributed programming paradigm can be used in a great way
for this. And was trying to create a custom Data Source for the same. Here
is the link to the repo -
https://github.com/sourav-mazumder/Data-Science-Extensions/tree/master/spark-datasource-rest

The interface goes like this :
- Inputs : REST API endpoint URL, input Data in a Temporary Spark Table -
the name of the table has to be passed, type of method (Get, Post, Put or
Delete), userid/password (for the sites which need authentication),
connection parameters (connection time, read time), parameter to call the
target Rest API only once (useful for services for which you have to pay or
have a daily/hourly limit)
- Output : A DataFrame with Rows of Struct. The Struct will have the output
returned by the target API.

Any thoughts/inputs on this  ?
a) Will this be useful for the applications/use cases you develop ?
b) What you typically use to address this type of needs ?
c) What else should be considered to make this framework more generic
/useful ?

Regards,
Sourav

P.S. I found this resource (https://www.alibabacloud.com/forum/read-474)
where the similar requirement is discussed and a solution is proposed. Not
sure what is the status of the proposal. However, some more things I found
need to be addressed in that proposal -
a) The proposal covers calling the Rest API for one set of key/value
parameter. In the above approach one can call same Rest API multiple times
with different sets of values of the keys.
b) There should be an option where Rest API should be called only once for
a given set of key/value parameters. This is important as many a times one
has to pay for accessing a Rest API and also there may be a limit per
day/hour basis.
c) Does not support calling a Rest service which is based on Post or other
HTTP methods
d) The results in other formats (like xml, csv) cannot be addressed


Re: Process large JSON file without causing OOM

2017-11-21 Thread Alec Swan
Pinging back to see if anybody could provide me with some pointers on hot
to stream/batch JSON-to-ORC conversion in Spark SQL or why I get an OOM
dump with such small memory footprint?

Thanks,

Alec

On Wed, Nov 15, 2017 at 11:03 AM, Alec Swan  wrote:

> Thanks Steve and Vadim for the feedback.
>
> @Steve, are you suggesting creating a custom receiver and somehow piping
> it through Spark Streaming/Spark SQL? Or are you suggesting creating
> smaller datasets from the stream and using my original code to process
> smaller datasets? It'd be very helpful for a novice, like myself, if you
> could provide code samples or links to docs/articles.
>
> @Vadim, I ran my test with local[1] and got OOM in the same place. What
> puzzles me is that when I expect the heap dump with VisualVM (see below) it
> says that the heap is pretty small ~35MB. I am running my test with
> "-Xmx10G -Dspark.executor.memory=6g  -Dspark.driver.memory=6g" JVM opts and
> I can see them reflected in Spark UI. Am I missing some memory settings?
>
> Date taken: Wed Nov 15 10:46:06 MST 2017
> File: /tmp/java_pid69786.hprof
> File size: 59.5 MB
>
> Total bytes: 39,728,337
> Total classes: 15,749
> Total instances: 437,979
> Classloaders: 123
> GC roots: 2,831
> Number of objects pending for finalization: 5,198
>
>
> Thanks,
>
> Alec
>
> On Wed, Nov 15, 2017 at 11:15 AM, Vadim Semenov <
> vadim.seme...@datadoghq.com> wrote:
>
>> There's a lot of off-heap memory involved in decompressing Snappy,
>> compressing ZLib.
>>
>> Since you're running using `local[*]`, you process multiple tasks
>> simultaneously, so they all might consume memory.
>>
>> I don't think that increasing heap will help, since it looks like you're
>> hitting system memory limits.
>>
>> I'd suggest trying to run with `local[2]` and checking what's the memory
>> usage of the jvm process.
>>
>> On Mon, Nov 13, 2017 at 7:22 PM, Alec Swan  wrote:
>>
>>> Hello,
>>>
>>> I am using the Spark library to convert JSON/Snappy files to ORC/ZLIB
>>> format. Effectively, my Java service starts up an embedded Spark cluster
>>> (master=local[*]) and uses Spark SQL to convert JSON to ORC. However, I
>>> keep getting OOM errors with large (~1GB) files.
>>>
>>> I've tried different ways to reduce memory usage, e.g. by partitioning
>>> data with dataSet.partitionBy("customer).save(filePath), or capping
>>> memory usage by setting spark.executor.memory=1G, but to no vail.
>>>
>>> I am wondering if there is a way to avoid OOM besides splitting the
>>> source JSON file into multiple smaller ones and processing the small ones
>>> individually? Does Spark SQL have to read the JSON/Snappy (row-based) file
>>> in it's entirety before converting it to ORC (columnar)? If so, would it
>>> make sense to create a custom receiver that reads the Snappy file and use
>>> Spark streaming for ORC conversion?
>>>
>>> Thanks,
>>>
>>> Alec
>>>
>>
>>
>


Re: Spark/Parquet/Statistics question

2017-11-21 Thread Rabin Banerjee
Spark is not adding any STAT meta in parquet files in Version 1.6.x.
Scanning all files for filter.

(1 to 30).map(i => (i, i.toString)).toDF("a",
"b").sort("a").coalesce(1).write.format("parquet").saveAsTable("metrics")

./parquet-meta /user/hive/warehouse/metrics/*.parquet

file:
file:/user/hive/warehouse/metrics/part-r-0-6552bc8f-ec05-4ce8-ad8d-dc22bdd2e502.gz.parquet


creator: parquet-mr version 1.6.0

extra:   org.apache.spark.sql.parquet.row.metadata =
{"type":"struct","fields":[{"name":"a","type":"integer","nullable":true,"metadata":{}},{"name":"b","type":"string","nullable":true,"metadata":{}}]}



file schema: spark_schema



a:   OPTIONAL INT32 R:0 D:1

b:   OPTIONAL BINARY O:UTF8 R:0 D:1


row group 1: RC:30 TS:4089139 OFFSET:4



a:INT32 GZIP DO:0 FPO:4 SZ:415087/1200095/2.89 VC:30
ENC:BIT_PACKED,RLE,PLAIN

b:BINARY GZIP DO:0 FPO:415091 SZ:667334/2889044/4.33 VC:30
ENC:BIT_PACKED,RLE,PLAIN

On Tue, Jan 17, 2017 at 9:41 PM, Michael Segel 
wrote:

> Hi,
> Lexicographically speaking, Min/Max should work because String(s)  support
> a comparator operator.  So anything which supports an equality test (<,>,
> <= , >= , == …) can also support min and max functions as well.
>
> I guess the question is if Spark does support this, and if not, why?
> Yes, it makes sense.
>
>
>
> > On Jan 17, 2017, at 9:17 AM, Jörn Franke  wrote:
> >
> > Hallo,
> >
> > I am not sure what you mean by min/max for strings. I do not know if
> this makes sense. What the ORC format has is bloom filters for strings etc.
> - are you referring to this?
> >
> > In order to apply min/max filters Spark needs to read the meta data of
> the file. If the filter is applied or not - this you can see from the
> number of bytes read.
> >
> >
> > Best regards
> >
> >> On 17 Jan 2017, at 15:28, djiang  wrote:
> >>
> >> Hi,
> >>
> >> I have been looking into how Spark stores statistics (min/max) in
> Parquet as
> >> well as how it uses the info for query optimization.
> >> I have got a few questions.
> >> First setup: Spark 2.1.0, the following sets up a Dataframe of 1000
> rows,
> >> with a long type and a string type column.
> >> They are sorted by different columns, though.
> >>
> >> scala> spark.sql("select id, cast(id as string) text from
> >> range(1000)").sort("id").write.parquet("/secret/spark21-sortById")
> >> scala> spark.sql("select id, cast(id as string) text from
> >> range(1000)").sort("Text").write.parquet("/secret/spark21-sortByText")
> >>
> >> I added some code to parquet-tools to print out stats and examine the
> >> generated parquet files:
> >>
> >> hadoop jar parquet-tools-1.9.1-SNAPSHOT.jar meta
> >> /secret/spark21-sortById/part-0-39f7ac12-6038-46ee-b5c3-
> d7a5a06e4425.snappy.parquet
> >> file:
> >> file:/secret/spark21-sortById/part-0-39f7ac12-6038-46ee-
> b5c3-d7a5a06e4425.snappy.parquet
> >> creator: parquet-mr version 1.8.1 (build
> >> 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf)
> >> extra:   org.apache.spark.sql.parquet.row.metadata =
> >> {"type":"struct","fields":[{"name":"id","type":"long","
> nullable":false,"metadata":{}},{"name":"text","type":"
> string","nullable":false,"metadata":{}}]}
> >>
> >> file schema: spark_schema
> >> 
> 
> >> id:  REQUIRED INT64 R:0 D:0
> >> text:REQUIRED BINARY O:UTF8 R:0 D:0
> >>
> >> row group 1: RC:5 TS:133 OFFSET:4
> >> 
> 
> >> id:   INT64 SNAPPY DO:0 FPO:4 SZ:71/81/1.14 VC:5
> >> ENC:PLAIN,BIT_PACKED STA:[min: 0, max: 4, num_nulls: 0]
> >> text: BINARY SNAPPY DO:0 FPO:75 SZ:53/52/0.98 VC:5
> >> ENC:PLAIN,BIT_PACKED
> >>
> >> hadoop jar parquet-tools-1.9.1-SNAPSHOT.jar meta
> >> /secret/spark21-sortByText/part-0-3d7eac74-5ca0-44a0-
> b8a6-d67cc38a2bde.snappy.parquet
> >> file:
> >> file:/secret/spark21-sortByText/part-0-3d7eac74-5ca0-44a0-b8a6-
> d67cc38a2bde.snappy.parquet
> >> creator: parquet-mr version 1.8.1 (build
> >> 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf)
> >> extra:   org.apache.spark.sql.parquet.row.metadata =
> >> {"type":"struct","fields":[{"name":"id","type":"long","
> nullable":false,"metadata":{}},{"name":"text","type":"
> string","nullable":false,"metadata":{}}]}
> >>
> >> file schema: spark_schema
> >> 
> 
> >> id:  REQUIRED INT64 R:0 D:0
> >> text:REQUIRED BINARY O:UTF8 R:0 D:0
> >>
> >> row group 1: RC:5 TS:140 OFFSET:4
> >> 

Parquet Filter pushdown not working and statistics are not generating for any column with Spark 1.6 CDH 5.7

2017-11-21 Thread Rabin Banerjee
Hi All ,


 I am using CDH 5.7 which comes with Spark version 1.6.0.  I am saving my
data set as parquet data and then querying it . Query is executing fine But
when I checked the files generated by spark, I found statistics(min/max) is
missing for all the columns . And hence filters are not pushed down. Its
scanning the entire file.


*(1 to 3).map(i => (i, i.toString)).toDF("a",
"b").sort("a").write.parquet("/hdfs/path/to/store")*


*parquet-tools meta
part-r-00186-03addad8-c19d-4812-b83b-a8708606183b.gz.parquet*

creator: p*arquet-mr version 1.5.0-cdh5.7.1* (build ${buildNumber})

extra:   org.apache.spark.sql.parquet.row.metadata =
{"type":"struct","fields":[{"name":"a","type":"integer","nullable":true,"metadata":{}},{"name":"b","type":"string","nullable":true,"metadata":{}}]}



file schema: spark_schema

--

a:   OPTIONAL INT32 R:0 D:1

b:   OPTIONAL BINARY O:UTF8 R:0 D:1


row group 1: RC:148 TS:2012

--

a:INT32 GZIP DO:0 FPO:4 SZ:297/635/2.14 VC:148
ENC:BIT_PACKED,PLAIN,RLE

b:BINARY GZIP DO:0 FPO:301 SZ:301/1377/4.57 VC:148
ENC:BIT_PACKED,PLAIN,RLE


As you can see from the parquet meta the STA field is missing. And spark is
scanning all data of all files.

Any suggestion ?


Thanks //

RB


Re: Writing files to s3 with out temporary directory

2017-11-21 Thread Jim Carroll
It's not actually that tough. We already use a custom Hadoop FileSystem for
S3 because when we started using Spark with S3 the native FileSystem was
very unreliable. Our's is based on the code from Presto. (see
https://github.com/prestodb/presto/blob/master/presto-hive/src/main/java/com/facebook/presto/hive/s3/PrestoS3FileSystem.java
). 

I already have a version that introduces a hash to the filename for the file
that's actually written to the S3 to see if it makes a difference per
https://docs.aws.amazon.com/AmazonS3/latest/dev/request-rate-perf-considerations.html#get-workload-considerations
. FWIW, it doesn't. I'm going to modify that experiment to override the key
name like before except actually mode the file, keep track of the state, and
override the rename method.

The problems with this approach are: 1) it's brittle because it depends on
the internal directory and file naming conventions in Hadoop and Parquet. 2)
It will assume (as seems to be currently the case) that the 'rename' call is
done for all files from the driver. But it should do until there's a better
solution in the Hadoop committer.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Dynamic data ingestion into SparkSQL - Interesting question

2017-11-21 Thread Aakash Basu
Yes, I did the same. It's working. Thanks!

On 21-Nov-2017 4:04 PM, "Fernando Pereira"  wrote:

> Did you consider do string processing to build the SQL expression which
> you can execute with spark.sql(...)?
> Some examples: https://spark.apache.org/docs/latest/sql-
> programming-guide.html#hive-tables
>
> Cheers
>
> On 21 November 2017 at 03:27, Aakash Basu 
> wrote:
>
>> Hi all,
>>
>> Any help? PFB.
>>
>> Thanks,
>> Aakash.
>>
>> On 20-Nov-2017 6:58 PM, "Aakash Basu"  wrote:
>>
>>> Hi all,
>>>
>>> I have a table which will have 4 columns -
>>>
>>> |  Expression|filter_condition| from_clause|
>>> group_by_columns|
>>>
>>>
>>> This file may have variable number of rows depending on the no. of KPIs
>>> I need to calculate.
>>>
>>> I need to write a SparkSQL program which will have to read this file and
>>> run each line of queries dynamically by fetching each column value for a
>>> particular row and create a select query out of it and run inside a
>>> dataframe, later saving it as a temporary table.
>>>
>>> Did anyone do this kind of exercise? If yes, can I get some help on it
>>> pls?
>>>
>>> Thanks,
>>> Aakash.
>>>
>>
>


Re: Dynamic data ingestion into SparkSQL - Interesting question

2017-11-21 Thread Fernando Pereira
Did you consider do string processing to build the SQL expression which you
can execute with spark.sql(...)?
Some examples:
https://spark.apache.org/docs/latest/sql-programming-guide.html#hive-tables

Cheers

On 21 November 2017 at 03:27, Aakash Basu 
wrote:

> Hi all,
>
> Any help? PFB.
>
> Thanks,
> Aakash.
>
> On 20-Nov-2017 6:58 PM, "Aakash Basu"  wrote:
>
>> Hi all,
>>
>> I have a table which will have 4 columns -
>>
>> |  Expression|filter_condition| from_clause|
>> group_by_columns|
>>
>>
>> This file may have variable number of rows depending on the no. of KPIs I
>> need to calculate.
>>
>> I need to write a SparkSQL program which will have to read this file and
>> run each line of queries dynamically by fetching each column value for a
>> particular row and create a select query out of it and run inside a
>> dataframe, later saving it as a temporary table.
>>
>> Did anyone do this kind of exercise? If yes, can I get some help on it
>> pls?
>>
>> Thanks,
>> Aakash.
>>
>


Spark Writing to parquet directory : java.io.IOException: Disk quota exceeded

2017-11-21 Thread Chetan Khatri
Hello Spark Users,

I am getting below error, when i am trying to write dataset to parquet
location. I have enough disk space available. Last time i was facing same
kind of error which were resolved by increasing number of cores at hyper
parameters. Currently result set data size is almost 400Gig with below
hyper parameters

Driver memory: 4g
Executor Memory: 16g
Executor cores=12
num executors= 8

Still it's failing, any Idea ? that if i increase executor memory and
number of executors.  it could get resolved ?


17/11/21 04:29:37 ERROR storage.DiskBlockObjectWriter: Uncaught exception
while reverting partial writes to file /mapr/chetan/local/
david.com/tmp/hadoop/nm-local-dir/usercache/david-khurana/appcache/application_1509639363072_10572/blockmgr-008604e6-37cb-421f-8cc5-e94db75684e7/12/temp_shuffle_ae885911-a1ef-404f-9a6a-ded544bb5b3c
java.io.IOException: Disk quota exceeded
at java.io.FileOutputStream.close0(Native Method)
at java.io.FileOutputStream.access$000(FileOutputStream.java:53)
at java.io.FileOutputStream$1.close(FileOutputStream.java:356)
at java.io.FileDescriptor.closeAll(FileDescriptor.java:212)
at java.io.FileOutputStream.close(FileOutputStream.java:354)
at
org.apache.spark.storage.TimeTrackingOutputStream.close(TimeTrackingOutputStream.java:72)
at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
at
net.jpountz.lz4.LZ4BlockOutputStream.close(LZ4BlockOutputStream.java:178)
at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
at
org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2.close(UnsafeRowSerializer.scala:96)
at
org.apache.spark.storage.DiskBlockObjectWriter$$anonfun$close$2.apply$mcV$sp(DiskBlockObjectWriter.scala:108)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1316)
at
org.apache.spark.storage.DiskBlockObjectWriter.close(DiskBlockObjectWriter.scala:107)
at
org.apache.spark.storage.DiskBlockObjectWriter.revertPartialWritesAndClose(DiskBlockObjectWriter.scala:159)
at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.stop(BypassMergeSortShuffleWriter.java:234)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:85)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
17/11/21 04:29:37 WARN netty.OneWayOutboxMessage: Failed to send one-way
RPC.
java.io.IOException: Failed to connect to /192.168.123.43:58889
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:228)
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:179)
at
org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:197)
at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:191)
at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:187)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.ConnectException: Connection refused: /
192.168.123.43:58889
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at
io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
at
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
  ... 1 more