Stream is corrupted in ShuffleBlockFetcherIterator

2019-08-15 Thread Mikhail Pryakhin
Hello, Spark community!

I've been struggling with my job which constantly fails due to inability to 
uncompress some previously compressed blocks while shuffling data. 
I use spark 2.2.0 with all the configuration settings left by default (no 
specific compression codec is specified). I've ascertained that 
LZ4CompressionCodec is used as a default codec. The job fails as soon as the 
limit of attempts exceeded with the following  message:

Caused by: java.io.IOException: Stream is corrupted
at 
org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:211)
at 
org.apache.spark.io.LZ4BlockInputStream.read(LZ4BlockInputStream.java:125)
at 
org.apache.spark.io.LZ4BlockInputStream.read(LZ4BlockInputStream.java:137)
at 
org.apache.spark.util.Utils$$anonfun$copyStream$1.apply$mcJ$sp(Utils.scala:340)
at 
org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327)
at 
org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
at org.apache.spark.util.Utils$.copyStream(Utils.scala:348)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:395)
... 28 more
Caused by: net.jpountz.lz4.LZ4Exception: Error decoding offset 14649 of input 
buffer


Actually, I've stumbled upon a bug [1] as a not fixed yet. Any clue on how to 
workaround this issue?  I've tried the Snappy codec but it fails likewise with 
a bit different message)

org.apache.spark.shuffle.FetchFailedException: failed to uncompress the chunk: 
FAILED_TO_UNCOMPRESS(5)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:442)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:403)
at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:59)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:211)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: failed to uncompress the chunk: 
FAILED_TO_UNCOMPRESS(5)
at 
org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:361)
at 
org.xerial.snappy.SnappyInputStream.rawRead(SnappyInputStream.java:158)
at org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:142)
at java.io.InputStream.read(InputStream.java:101)
at 
org.apache.spark.util.Utils$$anonfun$copyStream$1.apply$mcJ$sp(Utils.scala:340)
at 
org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327)
at 

Call Oracle Sequence using Spark

2019-08-15 Thread rajat kumar
Hi All,

I have to call Oracle sequence using spark. Can you pls tell what is the
way to do that?

Thanks
Rajat


Memory Limits error

2019-08-15 Thread Dennis Suhari
Hi community,

I am using Spark on Yarn. When submiting a job after a long time I get an error 
mesage and retry.

It happens when I want to store the dataframe to a table.

spark_df.write.option("path", 
"/nlb_datalake/golden_zone/webhose/sentiment").saveAsTable("news_summary_test", 
mode="overwrite") 

The error is (after long time):

 Hive Session ID = be590d1b-ed5b-404b-bcb4-77cbb977a847 [Stage 2:> (0 + 16) / 
16]19/08/15 15:42:08 WARN BlockManagerMasterEndpoint: No more replicas 
available for rdd_9_2 ! 19/08/15 15:42:08 WARN BlockManagerMasterEndpoint: No 
more replicas available for rdd_9_1 ! 19/08/15 15:42:08 WARN 
BlockManagerMasterEndpoint: No more replicas available for rdd_9_4 ! 19/08/15 
15:42:08 WARN BlockManagerMasterEndpoint: No more replicas available for 
rdd_9_6 ! 19/08/15 15:42:08 WARN BlockManagerMasterEndpoint: No more replicas 
available for rdd_9_7 ! 19/08/15 15:42:08 WARN BlockManagerMasterEndpoint: No 
more replicas available for rdd_9_0 ! 19/08/15 15:42:08 WARN 
BlockManagerMasterEndpoint: No more replicas available for rdd_9_5 ! 19/08/15 
15:42:08 WARN BlockManagerMasterEndpoint: No more replicas available for 
rdd_9_3 ! 19/08/15 15:42:08 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: 
Requesting driver to remove executor 2 for reason Container killed by YARN for 
exceeding memory limits. 9.1 GB of 9 GB physical memory used. Consider boosting 
spark.yarn.executor.memoryOverhead. 19/08/15 15:42:08 ERROR YarnScheduler: Lost 
executor 2 on nlb-srv-hd-08.i-lab.local: Container killed by YARN for exceeding 
memory limits. 9.1 GB of 9 GB physical memory used. Consider boosting 
spark.yarn.executor.memoryOverhead. 19/08/15 15:42:08 WARN TaskSetManager: Lost 
task 0.0 in stage 2.0 (TID 17, nlb-srv-hd-08.i-lab.local, executor 2): 
ExecutorLostFailure (executor 2 exited caused by one of the running tasks) 
Reason: Container killed by YARN for exceeding memory limits. 9.1 GB of 9 GB 
physical memory used. Consider boosting spark.yarn.executor.memoryOverhead. 
19/08/15 15:42:08 WARN TaskSetManager: Lost task 5.0 in stage 2.0 (TID 26, 
nlb-srv-hd-08.i-lab.local, executor 2): ExecutorLostFailure (executor 2 exite

Do you have a rough idea where to tweak ?

Br,

Dennis

Spark streaming kafka source delay occasionally

2019-08-15 Thread ans
using kafka consumer, 2 mins batch, tasks process take 2 ~ 5 seconds in
general, but a part of tasks take more than 40 seconds. I guess
*CachedKafkaConsumer#poll* could be problem.

private def poll(timeout: Long): Unit = {
val p = consumer.poll(timeout)
val r = p.records(topicPartition)
logDebug(s"Polled ${p.partitions()}  ${r.size}")
buffer = r.iterator
}

make changes on that method. add time consuming log

private def poll(timeout: Long): Unit = {
val start = System.currentTimeMillis()
val p = consumer.poll(timeout)
val end = System.currentTimeMillis()
val r = p.records(topicPartition)
logInfo(s"Polled ${p.partitions()}  ${r.size} cost ${end-start}" +
  s" ${System.currentTimeMillis() - end} timeout setting ${timeout}")
buffer = r.iterator
}

task log below:
19/08/15 16:46:00 INFO executor.CoarseGrainedExecutorBackend: Got assigned
task 124
19/08/15 16:46:00 INFO executor.Executor: Running task 3.0 in stage 10.0
(TID 124)
19/08/15 16:46:00 INFO broadcast.TorrentBroadcast: Started reading broadcast
variable 10
19/08/15 16:46:00 INFO memory.MemoryStore: Block broadcast_10_piece0 stored
as bytes in memory (estimated size 2.1 KB, free 366.2 MB)
19/08/15 16:46:00 INFO broadcast.TorrentBroadcast: Reading broadcast
variable 10 took 10 ms
19/08/15 16:46:00 INFO memory.MemoryStore: Block broadcast_10 stored as
values in memory (estimated size 3.6 KB, free 366.2 MB)
19/08/15 16:46:00 INFO kafka010.KafkaRDD: Computing topic test_topic,
partition 9 offsets 4382382407 -> 4382457892
*19/08/15 16:46:40 INFO kafka010.CachedKafkaConsumer: Polled [test_topic-9] 
705 cost 40050 0 timeout setting 12
*19/08/15 16:46:40 INFO kafka010.CachedKafkaConsumer: Polled [test_topic-9] 
691 cost 26 0 timeout setting 12
19/08/15 16:46:40 INFO kafka010.CachedKafkaConsumer: Polled [test_topic-9] 
725 cost 16 0 timeout setting 12
19/08/15 16:46:40 INFO kafka010.CachedKafkaConsumer: Polled [test_topic-9] 
714 cost 16 0 timeout setting 12
19/08/15 16:46:40 INFO kafka010.CachedKafkaConsumer: Polled [test_topic-9] 
721 cost 17 0 timeout setting 12
19/08/15 16:46:40 INFO kafka010.CachedKafkaConsumer: Polled [test_topic-9] 
709 cost 13 0 timeout setting 12
19/08/15 16:46:40 INFO kafka010.CachedKafkaConsumer: Polled [test_topic-9] 
719 cost 16 0 timeout setting 12
19/08/15 16:46:40 INFO kafka010.CachedKafkaConsumer: Polled [test_topic-9] 
734 cost 17 0 timeout setting 12
19/08/15 16:46:40 INFO kafka010.CachedKafkaConsumer: Polled [test_topic-9] 
728 cost 17 0 timeout setting 12
19/08/15 16:46:40 INFO kafka010.CachedKafkaConsumer: Polled [test_topic-9] 
724 cost 17 0 timeout setting 12
19/08/15 16:46:40 INFO kafka010.CachedKafkaConsumer: Polled [test_topic-9] 
719 cost 17 0 timeout setting 12
19/08/15 16:46:40 INFO kafka010.CachedKafkaConsumer: Polled [test_topic-9] 
738 cost 12 0 timeout setting 12
19/08/15 16:46:40 INFO kafka010.CachedKafkaConsumer: Polled [test_topic-9] 
709 cost 16 0 timeout setting 12
19/08/15 16:46:40 INFO kafka010.CachedKafkaConsumer: Polled [test_topic-9] 
701 cost 18 0 timeout setting 12
19/08/15 16:46:40 INFO kafka010.CachedKafkaConsumer: Polled [test_topic-9] 
718 cost 16 0 timeout setting 12
19/08/15 16:46:40 INFO kafka010.CachedKafkaConsumer: Polled [test_topic-9] 
734 cost 16 0 timeout setting 12
19/08/15 16:46:40 INFO kafka010.CachedKafkaConsumer: Polled [test_topic-9] 
702 cost 17 0 timeout setting 12
19/08/15 16:46:40 INFO kafka010.CachedKafkaConsumer: Polled [test_topic-9] 
717 cost 16 0 timeout setting 12
19/08/15 16:46:40 INFO kafka010.CachedKafkaConsumer: Polled [test_topic-9] 
711 cost 17 0 timeout setting 12
19/08/15 16:46:40 INFO kafka010.CachedKafkaConsumer: Polled [test_topic-9] 
715 cost 33 0 timeout setting 12
19/08/15 16:46:40 INFO kafka010.CachedKafkaConsumer: Polled [test_topic-9] 
726 cost 11 0 timeout setting 12
19/08/15 16:46:40 INFO kafka010.CachedKafkaConsumer: Polled [test_topic-9] 
713 cost 16 0 timeout setting 12
19/08/15 16:46:40 INFO kafka010.CachedKafkaConsumer: Polled [test_topic-9] 
697 cost 16 0 timeout setting 12
19/08/15 16:46:40 INFO kafka010.CachedKafkaConsumer: Polled [test_topic-9] 
719 cost 22 0 timeout setting 12
19/08/15 16:46:41 INFO kafka010.CachedKafkaConsumer: Polled [test_topic-9] 
750 cost 13 0 timeout setting 12
19/08/15 16:46:41 INFO kafka010.CachedKafkaConsumer: Polled [test_topic-9] 
707 cost 11 0 timeout setting 12
19/08/15 16:46:41 INFO kafka010.CachedKafkaConsumer: Polled [test_topic-9] 
743 cost 13 0 timeout setting 12
19/08/15 16:46:41 INFO kafka010.CachedKafkaConsumer: Polled [test_topic-9] 
746 cost 12 0 timeout setting 12
19/08/15 16:46:41 INFO kafka010.CachedKafkaConsumer: Polled [test_topic-9] 
737 cost 11 0 timeout setting 12
19/08/15 16:46:41 INFO kafka010.CachedKafkaConsumer: Polled [test_topic-9] 
715 cost 17 0 timeout setting 12
19/08/15 16:46:41 INFO kafka010.CachedKafkaConsumer: Polled 

Re: Spark Streaming concurrent calls

2019-08-15 Thread Tianlang

Hi

Whether kafka topic's partition number can help ?!

在 2019/8/13 下午10:53, Amit Sharma 写道:
I am using kafka spark streming. My UI application send request to 
streaming through kafka. Problem is streaming handles one request at a 
time so if multiple users send request at the same time they have to 
wait till earlier request are done.

Is there any way it can handle multiple request.


Thanks
Amit

--

TianlangStudio 

Some of the biggest lies: I will start tomorrow/Others are better than 
me/I am not good enough/I don't have time/This is the way I am 








Re: help understanding physical plan

2019-08-15 Thread Tianlang

Hi,

Maybe you can look at the spark ui. The physical plan has no time 
consuming information.


在 2019/8/13 下午10:45, Marcelo Valle 写道:

Hi,

I have a job running on AWS EMR. It's basically a join between 2 
tables (parquet files on s3), one somehow large (around 50 gb) and 
other small (less than 1gb).
The small table is the result of other operations, but it was a 
dataframe with `.persist(StorageLevel.MEMORY_AND_DISK_SER)` and the 
count on this dataframe finishes quickly.
When I run my "LEFT_ANTI" join, I get the execution plan down bellow. 
While most of my jobs on larges amount of data take max 1 h on this 
cluster, this one takes almost 1 day to complete.


What could I be doing wrong? I am trying to analyze the plan, but I 
can't find anything that justify the slowness. It has 2 shuffles 
followed by a zip, but other jobs have similar things and they are not 
that slow.


Could anyone point me to possible actions I could take to investigate 
this?


Thanks,
Marcelo.

== Physical Plan ==
*(2) Project [USAGE_AGGREGATED_METADATA_ID#1493, 
SENDER_RECORDING_IDENTIFIER#1499, AIP127258 AS SENDER_IP_ID#1702, 
USAGE_AGGREGATED_METADATA_HASH#1513]
+- *(2) BroadcastHashJoin [coalesce(USAGE_AGGREGATED_METADATA_ID#1493, 
), coalesce(SENDER_RECORDING_IDENTIFIER#1499, )], 
[coalesce(USAGE_AGGREGATED_METADATA_ID#356, ), 
coalesce(SENDER_RECORDING_IDENTIFIER#357, )], LeftAnti, BuildRight, 
((USAGE_AGGREGATED_METADATA_ID#356 <=> 
USAGE_AGGREGATED_METADATA_ID#1493) && (SENDER_RECORDING_IDENTIFIER#357 
<=> SENDER_RECORDING_IDENTIFIER#1499))
   :- InMemoryTableScan [USAGE_AGGREGATED_METADATA_ID#1493, 
SENDER_RECORDING_IDENTIFIER#1499, USAGE_AGGREGATED_METADATA_HASH#1513]
   :     +- InMemoryRelation [USAGE_AGGREGATED_METADATA_ID#1493, 
ISRC#1494, ISWC#1495, RECORDING_TITLE#1496, 
RECORDING_DISPLAY_ARTIST#1497, WORK_WRITERS#1498, 
SENDER_RECORDING_IDENTIFIER#1499, RECORDING_VERSION_TITLE#1500, 
WORK_TITLE#1501, CONTENT_TYPE#1502, 
USAGE_AGGREGATED_METADATA_HASH#1513], StorageLevel(disk, memory, 1 
replicas)
   :           +- *(2) Project [ID#328 AS 
USAGE_AGGREGATED_METADATA_ID#1493, isrc#289 AS ISRC#1494, iswc#290 AS 
ISWC#1495, track_name#291 AS RECORDING_TITLE#1496, artist_name#292 AS 
RECORDING_DISPLAY_ARTIST#1497, work_writer_names#293 AS 
WORK_WRITERS#1498, uri#286 AS SENDER_RECORDING_IDENTIFIER#1499, null 
AS RECORDING_VERSION_TITLE#1500, null AS WORK_TITLE#1501, SOUND AS 
CONTENT_TYPE#1502, UDF(array(isrc#289, track_name#291, null, 
artist_name#292, iswc#290, null, work_writer_names#293, SOUND)) AS 
USAGE_AGGREGATED_METADATA_HASH#1513]
   :              +- *(2) BroadcastHashJoin [coalesce(isrc_1#1419, ), 
coalesce(iswc_1#1420, ), coalesce(track_name_1#1421, ), 
coalesce(artist_name_1#1422, ), coalesce(work_writer_names_1#1423, )], 
[coalesce(isrc#289, ), coalesce(iswc#290, ), coalesce(track_name#291, 
), coalesce(artist_name#292, ), coalesce(work_writer_names#293, )], 
Inner, BuildLeft, (isrc#289 <=> isrc_1#1419) && (iswc#290 <=> 
iswc_1#1420)) && (track_name#291 <=> track_name_1#1421)) && 
(artist_name#292 <=> artist_name_1#1422)) && (work_writer_names#293 
<=> work_writer_names_1#1423))
   :                 :- BroadcastExchange 
HashedRelationBroadcastMode(List(coalesce(input[1, string, true], ), 
coalesce(input[2, string, true], ), coalesce(input[3, string, true], 
), coalesce(input[4, string, true], ), coalesce(input[5, string, 
true], )))
   :                 :  +- *(1) Project [ID#328, isrc#289 AS 
isrc_1#1419, iswc#290 AS iswc_1#1420, track_name#291 AS 
track_name_1#1421, artist_name#292 AS artist_name_1#1422, 
work_writer_names#293 AS work_writer_names_1#1423]

   :                 :     +- *(1) Filter isnotnull(ID#328)
   :                 :        +- InMemoryTableScan [ID#328, 
artist_name#292, isrc#289, iswc#290, track_name#291, 
work_writer_names#293], [isnotnull(ID#328)]
   :                 :              +- InMemoryRelation [ID#328, 
isrc#289, iswc#290, track_name#291, artist_name#292, 
work_writer_names#293], StorageLevel(disk, memory, 1 replicas)
   :                 :                    +- *(2) Project [ID#328, 
isrc#289, iswc#290, track_name#291, artist_name#292, 
work_writer_names#293]
   :                 :                       +- *(2) BroadcastHashJoin 
[coalesce(ISRC#329, ), coalesce(ISWC#330, ), 
coalesce(RECORDING_TITLE#331, ), 
coalesce(RECORDING_DISPLAY_ARTIST#332, ), coalesce(WORK_WRITERS#333, 
)], [coalesce(isrc#289, ), coalesce(iswc#290, ), 
coalesce(track_name#291, ), coalesce(substring(artist_name#292, 0, 
1000), ), coalesce(work_writer_names#293, )], RightOuter, BuildLeft, 
(isrc#289 <=> ISRC#329) && (iswc#290 <=> ISWC#330)) && 
(track_name#291 <=> RECORDING_TITLE#331)) && 
(substring(artist_name#292, 0, 1000) <=> 
RECORDING_DISPLAY_ARTIST#332)) && (work_writer_names#293 <=> 
WORK_WRITERS#333))
   :                 :                          :- BroadcastExchange 
HashedRelationBroadcastMode(List(coalesce(input[1, string, true], ), 
coalesce(input[2, string, true], ),