Re: Call Oracle Sequence using Spark

2019-08-16 Thread Nicolas Paris


> I have to call Oracle sequence using spark.

You might use jdbc and write your own lib from scala

I did such thing for postgres 
(https://framagit.org/parisni/spark-etl/tree/master/spark-postgres)
see sqlExecWithResultSet


On Thu, Aug 15, 2019 at 10:58:11PM +0530, rajat kumar wrote:
> Hi All,
> 
> I have to call Oracle sequence using spark. Can you pls tell what is the way 
> to
> do that?
> 
> Thanks
> Rajat

-- 
nicolas

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Stream is corrupted in ShuffleBlockFetcherIterator

2019-08-16 Thread Vadim Semenov
This is what you're looking for:

Handle large corrupt shuffle blocks
https://issues.apache.org/jira/browse/SPARK-26089

So until 3.0 the only way I can think of is to reduce the size/split your
job into many

On Thu, Aug 15, 2019 at 4:47 PM Mikhail Pryakhin 
wrote:

> Hello, Spark community!
>
> I've been struggling with my job which constantly fails due to inability
> to uncompress some previously compressed blocks while shuffling data.
> I use spark 2.2.0 with all the configuration settings left by default (no
> specific compression codec is specified). I've ascertained that
> LZ4CompressionCodec is used as a default codec. The job fails as soon as
> the limit of attempts exceeded with the following  message:
>
> Caused by: java.io.IOException: Stream is corrupted
> at
> org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:211)
> at
> org.apache.spark.io.LZ4BlockInputStream.read(LZ4BlockInputStream.java:125)
> at
> org.apache.spark.io.LZ4BlockInputStream.read(LZ4BlockInputStream.java:137)
> at
> org.apache.spark.util.Utils$$anonfun$copyStream$1.apply$mcJ$sp(Utils.scala:340)
> at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327)
> at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
> at org.apache.spark.util.Utils$.copyStream(Utils.scala:348)
> at
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:395)
> ... 28 more
> Caused by: net.jpountz.lz4.LZ4Exception: Error decoding offset 14649 of
> input buffer
>
>
> Actually, I've stumbled upon a bug [1] as a not fixed yet. Any clue on how
> to workaround this issue?  I've tried the Snappy codec but it fails
> likewise with a bit different message)
>
> org.apache.spark.shuffle.FetchFailedException: failed to uncompress the
> chunk: FAILED_TO_UNCOMPRESS(5)
> at
> org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:442)
> at
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:403)
> at
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:59)
> at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
> at
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
> Source)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
> Source)
> at
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
> at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:211)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
> Source)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
> Source)
> at
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
> at org.apache.spark.scheduler.Task.run(Task.scala:108)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: failed to uncompress the chunk:
> FAILED_TO_UNCOMPRESS(5)
> at
> org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:361)
> at org.xerial.snappy.SnappyInputStream.rawRead(SnappyInputStream.java:158)
> at org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:142)
> at java.io.InputStream.read(InputStream.java:101)
> at
> org.apache.spark.util.Utils$$anonfun$copyStream$1.apply$mcJ$sp(Utils.scala:340)
> at 

Spark on YARN with private Docker repositories/registries

2019-08-16 Thread Tak-Lon (Stephen) Wu
Hi guys,

Have anyone been using spark (spark-submit) with yarn mode which pull
images from a private Docker repositories/registries ??

how do you pass in the docker config.json which included the auth tokens ?
or is there any environment variable can be added in the system environment
to make it load from it by default?

Thanks,
Stephen


Release Spark 2.3.4

2019-08-16 Thread Kazuaki Ishizaki
Hi, All.

Spark 2.3.3 was released six months ago (15th February, 2019) at 
http://spark.apache.org/news/spark-2-3-3-released.html. And, about 18 
months have been passed after Spark 2.3.0 has been released (28th 
February, 2018).
As of today (16th August), there are 103 commits (69 JIRAs) in `branch-23` 
since 2.3.3.

It would be great if we can have Spark 2.3.4.
If it is ok, shall we start `2.3.4 RC1` concurrent with 2.4.4 or after 
2.4.4 will be released?

A issue list in jira: 
https://issues.apache.org/jira/projects/SPARK/versions/12344844
A commit list in github from the last release: 
https://github.com/apache/spark/compare/66fd9c34bf406a4b5f86605d06c9607752bd637a...branch-2.3
The 8 correctness issues resolved in branch-2.3:
https://issues.apache.org/jira/browse/SPARK-26873?jql=project%20%3D%2012315420%20AND%20fixVersion%20%3D%2012344844%20AND%20labels%20in%20(%27correctness%27)%20ORDER%20BY%20priority%20DESC%2C%20key%20ASC

Best Regards,
Kazuaki Ishizaki



Re: Memory Limits error

2019-08-16 Thread Amit Sharma
Increasing your driver memory as 12g.

On Thursday, August 15, 2019, Dennis Suhari 
wrote:

> Hi community,
>
> I am using Spark on Yarn. When submiting a job after a long time I get an
> error mesage and retry.
>
> It happens when I want to store the dataframe to a table.
>
> spark_df.write.option("path", "/nlb_datalake/golden_zone/
> webhose/sentiment").saveAsTable("news_summary_test", mode="overwrite")
>
> The error is (after long time):
>
>  Hive Session ID = be590d1b-ed5b-404b-bcb4-77cbb977a847 [Stage 2:> (0 +
> 16) / 16]19/08/15 15:42:08 WARN BlockManagerMasterEndpoint: No more
> replicas available for rdd_9_2 ! 19/08/15 15:42:08 WARN
> BlockManagerMasterEndpoint: No more replicas available for rdd_9_1 !
> 19/08/15 15:42:08 WARN BlockManagerMasterEndpoint: No more replicas
> available for rdd_9_4 ! 19/08/15 15:42:08 WARN BlockManagerMasterEndpoint:
> No more replicas available for rdd_9_6 ! 19/08/15 15:42:08 WARN
> BlockManagerMasterEndpoint: No more replicas available for rdd_9_7 !
> 19/08/15 15:42:08 WARN BlockManagerMasterEndpoint: No more replicas
> available for rdd_9_0 ! 19/08/15 15:42:08 WARN BlockManagerMasterEndpoint:
> No more replicas available for rdd_9_5 ! 19/08/15 15:42:08 WARN
> BlockManagerMasterEndpoint: No more replicas available for rdd_9_3 !
> 19/08/15 15:42:08 WARN YarnSchedulerBackend$YarnSchedulerEndpoint:
> Requesting driver to remove executor 2 for reason Container killed by YARN
> for exceeding memory limits. 9.1 GB of 9 GB physical memory used. Consider
> boosting spark.yarn.executor.memoryOverhead. 19/08/15 15:42:08 ERROR
> YarnScheduler: Lost executor 2 on nlb-srv-hd-08.i-lab.local: Container
> killed by YARN for exceeding memory limits. 9.1 GB of 9 GB physical memory
> used. Consider boosting spark.yarn.executor.memoryOverhead. 19/08/15
> 15:42:08 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 17,
> nlb-srv-hd-08.i-lab.local, executor 2): ExecutorLostFailure (executor 2
> exited caused by one of the running tasks) Reason: Container killed by YARN
> for exceeding memory limits. 9.1 GB of 9 GB physical memory used. Consider
> boosting spark.yarn.executor.memoryOverhead. 19/08/15 15:42:08 WARN
> TaskSetManager: Lost task 5.0 in stage 2.0 (TID 26,
> nlb-srv-hd-08.i-lab.local, executor 2): ExecutorLostFailure (executor 2
> exite
>
> Do you have a rough idea where to tweak ?
>
> Br,
>
> Dennis
>


Re: help understanding physical plan

2019-08-16 Thread Marcelo Valle
Thanks Tianlang. I saw the DAG on YARN, but what really solved my problem
is adding intermediate steps and evaluating them eagerly to find out where
the bottleneck was.
My process now runs in 6 min. :D

Thanks for the help.

[]s

On Thu, 15 Aug 2019 at 07:25, Tianlang 
wrote:

> Hi,
>
> Maybe you can look at the spark ui. The physical plan has no time
> consuming information.
> 在 2019/8/13 下午10:45, Marcelo Valle 写道:
>
> Hi,
>
> I have a job running on AWS EMR. It's basically a join between 2 tables
> (parquet files on s3), one somehow large (around 50 gb) and other small
> (less than 1gb).
> The small table is the result of other operations, but it was a dataframe
> with `.persist(StorageLevel.MEMORY_AND_DISK_SER)` and the count on this
> dataframe finishes quickly.
> When I run my "LEFT_ANTI" join, I get the execution plan down bellow.
> While most of my jobs on larges amount of data take max 1 h on this
> cluster, this one takes almost 1 day to complete.
>
> What could I be doing wrong? I am trying to analyze the plan, but I can't
> find anything that justify the slowness. It has 2 shuffles followed by a
> zip, but other jobs have similar things and they are not that slow.
>
> Could anyone point me to possible actions I could take to investigate this?
>
> Thanks,
> Marcelo.
>
> == Physical Plan ==
> *(2) Project [USAGE_AGGREGATED_METADATA_ID#1493,
> SENDER_RECORDING_IDENTIFIER#1499, AIP127258 AS SENDER_IP_ID#1702,
> USAGE_AGGREGATED_METADATA_HASH#1513]
> +- *(2) BroadcastHashJoin [coalesce(USAGE_AGGREGATED_METADATA_ID#1493, ),
> coalesce(SENDER_RECORDING_IDENTIFIER#1499, )],
> [coalesce(USAGE_AGGREGATED_METADATA_ID#356, ),
> coalesce(SENDER_RECORDING_IDENTIFIER#357, )], LeftAnti, BuildRight,
> ((USAGE_AGGREGATED_METADATA_ID#356 <=> USAGE_AGGREGATED_METADATA_ID#1493)
> && (SENDER_RECORDING_IDENTIFIER#357 <=> SENDER_RECORDING_IDENTIFIER#1499))
>:- InMemoryTableScan [USAGE_AGGREGATED_METADATA_ID#1493,
> SENDER_RECORDING_IDENTIFIER#1499, USAGE_AGGREGATED_METADATA_HASH#1513]
>: +- InMemoryRelation [USAGE_AGGREGATED_METADATA_ID#1493,
> ISRC#1494, ISWC#1495, RECORDING_TITLE#1496, RECORDING_DISPLAY_ARTIST#1497,
> WORK_WRITERS#1498, SENDER_RECORDING_IDENTIFIER#1499,
> RECORDING_VERSION_TITLE#1500, WORK_TITLE#1501, CONTENT_TYPE#1502,
> USAGE_AGGREGATED_METADATA_HASH#1513], StorageLevel(disk, memory, 1 replicas)
>:   +- *(2) Project [ID#328 AS
> USAGE_AGGREGATED_METADATA_ID#1493, isrc#289 AS ISRC#1494, iswc#290 AS
> ISWC#1495, track_name#291 AS RECORDING_TITLE#1496, artist_name#292 AS
> RECORDING_DISPLAY_ARTIST#1497, work_writer_names#293 AS WORK_WRITERS#1498,
> uri#286 AS SENDER_RECORDING_IDENTIFIER#1499, null AS
> RECORDING_VERSION_TITLE#1500, null AS WORK_TITLE#1501, SOUND AS
> CONTENT_TYPE#1502, UDF(array(isrc#289, track_name#291, null,
> artist_name#292, iswc#290, null, work_writer_names#293, SOUND)) AS
> USAGE_AGGREGATED_METADATA_HASH#1513]
>:  +- *(2) BroadcastHashJoin [coalesce(isrc_1#1419, ),
> coalesce(iswc_1#1420, ), coalesce(track_name_1#1421, ),
> coalesce(artist_name_1#1422, ), coalesce(work_writer_names_1#1423, )],
> [coalesce(isrc#289, ), coalesce(iswc#290, ), coalesce(track_name#291, ),
> coalesce(artist_name#292, ), coalesce(work_writer_names#293, )], Inner,
> BuildLeft, (isrc#289 <=> isrc_1#1419) && (iswc#290 <=> iswc_1#1420)) &&
> (track_name#291 <=> track_name_1#1421)) && (artist_name#292 <=>
> artist_name_1#1422)) && (work_writer_names#293 <=>
> work_writer_names_1#1423))
>: :- BroadcastExchange
> HashedRelationBroadcastMode(List(coalesce(input[1, string, true], ),
> coalesce(input[2, string, true], ), coalesce(input[3, string, true], ),
> coalesce(input[4, string, true], ), coalesce(input[5, string, true], )))
>: :  +- *(1) Project [ID#328, isrc#289 AS isrc_1#1419,
> iswc#290 AS iswc_1#1420, track_name#291 AS track_name_1#1421,
> artist_name#292 AS artist_name_1#1422, work_writer_names#293 AS
> work_writer_names_1#1423]
>: : +- *(1) Filter isnotnull(ID#328)
>: :+- InMemoryTableScan [ID#328,
> artist_name#292, isrc#289, iswc#290, track_name#291,
> work_writer_names#293], [isnotnull(ID#328)]
>: :  +- InMemoryRelation [ID#328, isrc#289,
> iswc#290, track_name#291, artist_name#292, work_writer_names#293],
> StorageLevel(disk, memory, 1 replicas)
>: :+- *(2) Project [ID#328,
> isrc#289, iswc#290, track_name#291, artist_name#292, work_writer_names#293]
>: :   +- *(2) BroadcastHashJoin
> [coalesce(ISRC#329, ), coalesce(ISWC#330, ), coalesce(RECORDING_TITLE#331,
> ), coalesce(RECORDING_DISPLAY_ARTIST#332, ), coalesce(WORK_WRITERS#333, )],
> [coalesce(isrc#289, ), coalesce(iswc#290, ), coalesce(track_name#291, ),
> coalesce(substring(artist_name#292, 0, 1000), ),
> coalesce(work_writer_names#293, )], RightOuter, BuildLeft, (isrc#289
> <=> ISRC#329) &&