Re: Serialization or internal functions?

2020-04-09 Thread Vadim Semenov
You can take a look at the code that Spark generates:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.debug.codegenString

val spark: SparkSession
import org.apache.spark.sql.functions._
import spark.implicits._

val data = Seq("A","b","c").toDF("col")
data.write.parquet("/tmp/data")

val df = spark.read.parquet("/tmp/data")

val df1 = df.withColumn("valueconcat", concat(col(data.columns.head),
lit(" "), lit("concat"))).select("valueconcat")
println(codegenString(df1.queryExecution.executedPlan))

val df2 = df.map(e=> s"$e concat")
println(codegenString(df2.queryExecution.executedPlan))


It shows that for the df1 it internally uses
org.apache.spark.unsafe.types.UTF8String#concat vs
deserialization/serialization of the map function in the df2

Using spark native functions in most cases is the most effective way
in terms of performance

On Sat, Apr 4, 2020 at 2:07 PM  wrote:
>
> Dear Community,
>
>
>
> Recently, I had to solve the following problem “for every entry of a 
> Dataset[String], concat a constant value” , and to solve it, I used built-in 
> functions :
>
>
>
> val data = Seq("A","b","c").toDS
>
>
>
> scala> data.withColumn("valueconcat",concat(col(data.columns.head),lit(" 
> "),lit("concat"))).select("valueconcat").explain()
>
> == Physical Plan ==
>
> LocalTableScan [valueconcat#161]
>
>
>
> As an alternative , a much simpler version of the program is to use map, but 
> it adds a serialization step that does not seem to be present for the version 
> above :
>
>
>
> scala> data.map(e=> s"$e concat").explain
>
> == Physical Plan ==
>
> *(1) SerializeFromObject [staticinvoke(class 
> org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, 
> java.lang.String, true], true, false) AS value#92]
>
> +- *(1) MapElements , obj#91: java.lang.String
>
>+- *(1) DeserializeToObject value#12.toString, obj#90: java.lang.String
>
>   +- LocalTableScan [value#12]
>
>
>
> Is this over-optimization or is this the right way to go?
>
>
>
> As a follow up , is there any better API to get the one and only column 
> available in a DataSet[String] when using built-in functions? 
> “col(data.columns.head)” works but it is not ideal.
>
>
>
> Thanks!



-- 
Sent from my iPhone

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Driver OutOfMemoryError in MapOutputTracker$.serializeMapStatuses for 40 TB shuffle.

2019-11-11 Thread Vadim Semenov
There's an umbrella ticket for various 2GB limitations
https://issues.apache.org/jira/browse/SPARK-6235

On Fri, Nov 8, 2019 at 4:11 PM Jacob Lynn  wrote:
>
> Sorry for the noise, folks! I understand that reducing the number of 
> partitions works around the issue (at the scale I'm working at, anyway) -- as 
> I mentioned in my initial email -- and I understand the root cause. I'm not 
> looking for advice on how to resolve my issue. I'm just pointing out that 
> this is a real bug/limitation that impacts real-world use cases, in case 
> there is some proper Spark dev out there who is looking for a problem to 
> solve.
>
> On Fri, Nov 8, 2019 at 2:24 PM Vadim Semenov  
> wrote:
>>
>> Basically, the driver tracks partitions and sends it over to
>> executors, so what it's trying to do is to serialize and compress the
>> map but because it's so big, it goes over 2GiB and that's Java's limit
>> on the max size of byte arrays, so the whole thing drops.
>>
>> The size of data doesn't matter here much but the number of partitions
>> is what the root cause of the issue, try reducing it below 3 and
>> see how it goes.
>>
>> On Fri, Sep 7, 2018 at 10:35 AM Harel Gliksman  wrote:
>> >
>> > Hi,
>> >
>> > We are running a Spark (2.3.1) job on an EMR cluster with 500 r3.2xlarge 
>> > (60 GB, 8 vcores, 160 GB SSD ). Driver memory is set to 25GB.
>> >
>> > It processes ~40 TB of data using aggregateByKey in which we specify 
>> > numPartitions = 300,000.
>> > Map side tasks succeed, but reduce side tasks all fail.
>> >
>> > We notice the following driver error:
>> >
>> > 18/09/07 13:35:03 WARN Utils: Suppressing exception in finally: null
>> >
>> >  java.lang.OutOfMemoryError
>> >
>> > at 
>> > java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
>> > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
>> > at 
>> > java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>> > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
>> > at 
>> > java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
>> > at java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
>> > at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
>> > at 
>> > java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
>> > at 
>> > java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
>> > at 
>> > java.io.ObjectOutputStream$BlockDataOutputStream.flush(ObjectOutputStream.java:1822)
>> > at java.io.ObjectOutputStream.flush(ObjectOutputStream.java:719)
>> > at java.io.ObjectOutputStream.close(ObjectOutputStream.java:740)
>> > at 
>> > org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$2.apply$mcV$sp(MapOutputTracker.scala:790)
>> > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1389)
>> > at 
>> > org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:789)
>> > at 
>> > org.apache.spark.ShuffleStatus.serializedMapStatus(MapOutputTracker.scala:174)
>> > at 
>> > org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:397)
>> > at 
>> > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> > at 
>> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> > at java.lang.Thread.run(Thread.java:748)
>> > Exception in thread "map-output-dispatcher-0" java.lang.OutOfMemoryError
>> > at 
>> > java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
>> > at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
>> > at 
>> > java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>> > at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
>> > at 
>> > java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
>> > at java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
>> > at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
>> > at 
>> > java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
>> > at 
>> > java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
>> > at 
>&g

Re: Driver OutOfMemoryError in MapOutputTracker$.serializeMapStatuses for 40 TB shuffle.

2019-11-08 Thread Vadim Semenov
Basically, the driver tracks partitions and sends it over to
executors, so what it's trying to do is to serialize and compress the
map but because it's so big, it goes over 2GiB and that's Java's limit
on the max size of byte arrays, so the whole thing drops.

The size of data doesn't matter here much but the number of partitions
is what the root cause of the issue, try reducing it below 3 and
see how it goes.

On Fri, Sep 7, 2018 at 10:35 AM Harel Gliksman  wrote:
>
> Hi,
>
> We are running a Spark (2.3.1) job on an EMR cluster with 500 r3.2xlarge (60 
> GB, 8 vcores, 160 GB SSD ). Driver memory is set to 25GB.
>
> It processes ~40 TB of data using aggregateByKey in which we specify 
> numPartitions = 300,000.
> Map side tasks succeed, but reduce side tasks all fail.
>
> We notice the following driver error:
>
> 18/09/07 13:35:03 WARN Utils: Suppressing exception in finally: null
>
>  java.lang.OutOfMemoryError
>
> at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> at java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
> at java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
> at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
> at 
> java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
> at 
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
> at 
> java.io.ObjectOutputStream$BlockDataOutputStream.flush(ObjectOutputStream.java:1822)
> at java.io.ObjectOutputStream.flush(ObjectOutputStream.java:719)
> at java.io.ObjectOutputStream.close(ObjectOutputStream.java:740)
> at 
> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$2.apply$mcV$sp(MapOutputTracker.scala:790)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1389)
> at 
> org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:789)
> at 
> org.apache.spark.ShuffleStatus.serializedMapStatus(MapOutputTracker.scala:174)
> at 
> org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:397)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Exception in thread "map-output-dispatcher-0" java.lang.OutOfMemoryError
> at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> at java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
> at java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
> at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
> at 
> java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
> at 
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
> at 
> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
> at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at 
> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply$mcV$sp(MapOutputTracker.scala:787)
> at 
> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:786)
> at 
> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:786)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1380)
> at 
> org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:789)
> at 
> org.apache.spark.ShuffleStatus.serializedMapStatus(MapOutputTracker.scala:174)
> at 
> org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:397)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Suppressed: java.lang.OutOfMemoryError
> at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at 

Re: intermittent Kryo serialization failures in Spark

2019-09-18 Thread Vadim Semenov
I remember it not working for us when we were setting it from the inside
and needed to actually pass it

On Wed, Sep 18, 2019 at 10:38 AM Jerry Vinokurov 
wrote:

> Hi Vadim,
>
> Thanks for your suggestion. We do preregister the classes, like so:
>
> object KryoRegistrar {
>>
>>   val classesToRegister: Array[Class[_]] = Array(
>> classOf[MyModel],
>>[etc]
>> ) }
>>
>
> And then we do:
>
> val sparkConf = new SparkConf()
>>   .registerKryoClasses(KryoRegistrar.classesToRegister)
>>
>
>  I notice that this is a bit different from your code and I'm wondering
> whether there's any functional difference or if these are two ways to get
> to the same end. Our code is directly adapted from the Spark documentation
> on how to use the Kryo serializer but maybe there's some subtlety here that
> I'm missing.
>
> With regard to the settings, it looks like we currently have the default
> settings, which is to say that referenceTracking is true,
> registrationRequired is false, unsafe is false, and buffer.max is 64m (none
> of our objects are anywhere near that size but... who knows). I will try it
> with your suggestions and see if it solves the problem.
>
> thanks,
> Jerry
>
> On Tue, Sep 17, 2019 at 4:34 PM Vadim Semenov  wrote:
>
>> Pre-register your classes:
>>
>> ```
>> import com.esotericsoftware.kryo.Kryo
>> import org.apache.spark.serializer.KryoRegistrator
>>
>> class MyKryoRegistrator extends KryoRegistrator {
>>   override def registerClasses(kryo: Kryo): Unit = {
>> kryo.register(Class.forName("[[B")) // byte[][]
>> kryo.register(classOf[java.lang.Class[_]])
>>   }
>> }
>> ```
>>
>> then run with
>>
>> 'spark.kryo.referenceTracking': 'false',
>> 'spark.kryo.registrationRequired': 'false',
>> 'spark.kryo.registrator': 'com.datadog.spark.MyKryoRegistrator',
>> 'spark.kryo.unsafe': 'false',
>> 'spark.kryoserializer.buffer.max': '256m',
>>
>> On Tue, Sep 17, 2019 at 10:38 AM Jerry Vinokurov 
>> wrote:
>>
>>> Hi folks,
>>>
>>> Posted this some time ago but the problem continues to bedevil us. I'm
>>> including a (slightly edited) stack trace that results from this error. If
>>> anyone can shed any light on what exactly is happening here and what we can
>>> do to avoid it, that would be much appreciated.
>>>
>>> org.apache.spark.SparkException: Failed to register classes with Kryo
>>>>at 
>>>> org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:140)
>>>>at 
>>>> org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:324)
>>>>at 
>>>> org.apache.spark.serializer.KryoSerializerInstance.(KryoSerializer.scala:309)
>>>>at 
>>>> org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:218)
>>>>at 
>>>> org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:288)
>>>>at 
>>>> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:127)
>>>>at 
>>>> org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:88)
>>>>at 
>>>> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
>>>>at 
>>>> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
>>>>at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1489)
>>>>at 
>>>> org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.buildReader(CSVFileFormat.scala:103)
>>>>at 
>>>> org.apache.spark.sql.execution.datasources.FileFormat$class.buildReaderWithPartitionValues(FileFormat.scala:129)
>>>>at 
>>>> org.apache.spark.sql.execution.datasources.TextBasedFileFormat.buildReaderWithPartitionValues(FileFormat.scala:165)
>>>>at 
>>>> org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:309)
>>>>at 
>>>> org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:305)
>>>>at 
>>>> org.apache.spark.sql.execution.FileSourceScanExec.inputRDDs(DataSourceScanExec.scala:327)
>>>>at 
>>>> org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:121)
>>>>at 
>>>> or

Re: intermittent Kryo serialization failures in Spark

2019-09-17 Thread Vadim Semenov
Pre-register your classes:

```
import com.esotericsoftware.kryo.Kryo
import org.apache.spark.serializer.KryoRegistrator

class MyKryoRegistrator extends KryoRegistrator {
  override def registerClasses(kryo: Kryo): Unit = {
kryo.register(Class.forName("[[B")) // byte[][]
kryo.register(classOf[java.lang.Class[_]])
  }
}
```

then run with

'spark.kryo.referenceTracking': 'false',
'spark.kryo.registrationRequired': 'false',
'spark.kryo.registrator': 'com.datadog.spark.MyKryoRegistrator',
'spark.kryo.unsafe': 'false',
'spark.kryoserializer.buffer.max': '256m',

On Tue, Sep 17, 2019 at 10:38 AM Jerry Vinokurov 
wrote:

> Hi folks,
>
> Posted this some time ago but the problem continues to bedevil us. I'm
> including a (slightly edited) stack trace that results from this error. If
> anyone can shed any light on what exactly is happening here and what we can
> do to avoid it, that would be much appreciated.
>
> org.apache.spark.SparkException: Failed to register classes with Kryo
>>  at 
>> org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:140)
>>  at 
>> org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:324)
>>  at 
>> org.apache.spark.serializer.KryoSerializerInstance.(KryoSerializer.scala:309)
>>  at 
>> org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:218)
>>  at 
>> org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:288)
>>  at 
>> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:127)
>>  at 
>> org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:88)
>>  at 
>> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
>>  at 
>> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
>>  at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1489)
>>  at 
>> org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.buildReader(CSVFileFormat.scala:103)
>>  at 
>> org.apache.spark.sql.execution.datasources.FileFormat$class.buildReaderWithPartitionValues(FileFormat.scala:129)
>>  at 
>> org.apache.spark.sql.execution.datasources.TextBasedFileFormat.buildReaderWithPartitionValues(FileFormat.scala:165)
>>  at 
>> org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:309)
>>  at 
>> org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:305)
>>  at 
>> org.apache.spark.sql.execution.FileSourceScanExec.inputRDDs(DataSourceScanExec.scala:327)
>>  at 
>> org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:121)
>>  at 
>> org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
>>  at 
>> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
>>  at 
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>  at 
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>  at 
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>  at 
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>  at 
>> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>  at 
>> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92)
>>  at 
>> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
>>  at 
>> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
>>  at 
>> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
>>  at 
>> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
>>  at 
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>  at 
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>  at 
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>  at 
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>  at 
>> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>  at 
>> org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391)
>>  at org.apache.spark.sql.execution.SortExec.inputRDDs(SortExec.scala:121)
>>  at 
>> 

Re: EMR Spark 2.4.3 executor hang

2019-09-03 Thread Vadim Semenov
Try "spark.shuffle.io.numConnectionsPerPeer=10"

On Fri, Aug 30, 2019 at 10:22 AM Daniel Zhang  wrote:

> Hi, All:
> We are testing the EMR and compare with our on-premise HDP solution. We
> use one application as the test:
> EMR (5.21.1) with Hadoop 2.8.5 + Spark 2.4.3 vs HDP (2.6.3) with Hadoop
> 2.7.3 + Spark 2.2.0
> The application is very simple, just read Parquet raw file, then do a
> DS.repartition(id_col).flatMap().write.partitionBy(col).save() operation.
>
> For the testing data on HDP with 6 slave nodes (32G each), the whole
> application can finish around 3 hours. We are fine with it.
> This application will run a Spark application with 2 stages. The 2nd stage
> will run with 200 tasks as default.
> On EMR, we observed that 2 of 200 tasks is hanging for more than 10 hours,
> while the rests are done, and we have to give up.
>
> The first test is to read the raw parquet file from S3 and use AWS S3 as
> the output directly. So I think it could be some issue with S3 output
> committer. So we change the test to read parquet file from S3 and use EMR
> HDFS as the output location.
> To my surprise, we observed the same behavior using HDFS, 2 of 200 tasks
> hanging forever, and they are on different executors. These 2 executors are
> normal to process other tasks but just hang for these 2 tasks, while all
> the rest finished.
>
> This looks like data skew, but we know it is not. As the same application
> and the same data work fine on HDP, and we saw well-balanced data across
> all 200 tasks.
>
> Now I checked more careful for the executors log on EMR for using HDFS
> test case, and I know the S3 is not an issue here, as all the parquet raw
> data being read in the first stage of the job WITHOUT any delay.
>
> Sample log from the finished executor on EMR:
> *19/08/29 20:18:49 INFO Executor: Finished task 157.0 in stage 2.0 (TID
> 170). 3854 bytes result sent to driver*
> *19/08/29 20:18:49 INFO CoarseGrainedExecutorBackend: Got assigned task
> 179*
> *19/08/29 20:18:49 INFO Executor: Running task 166.0 in stage 2.0 (TID
> 179)*
> *19/08/29 20:18:49 INFO ShuffleBlockFetcherIterator: Getting 12 non-empty
> blocks including 1 local blocks and 11 remote blocks*
> *19/08/29 20:18:49 INFO TransportClientFactory: Found inactive connection
> to ip-10-51-51-147.ec2.internal/10.51.51.147:7337
> , creating a new one.*
> *19/08/29 20:18:49 INFO TransportClientFactory: Successfully created
> connection to ip-10-51-51-147.ec2.internal/10.51.51.147:7337
>  after 0 ms (0 ms spent in bootstraps)*
> *19/08/29 20:18:49 INFO TransportClientFactory: Found inactive connection
> to ip-10-51-51-141.ec2.internal/10.51.51.141:7337
> , creating a new one.*
> *19/08/29 20:18:49 INFO TransportClientFactory: Successfully created
> connection to ip-10-51-51-141.ec2.internal/10.51.51.141:7337
>  after 0 ms (0 ms spent in bootstraps)*
> *19/08/29 20:18:49 INFO TransportClientFactory: Found inactive connection
> to ip-10-51-51-155.ec2.internal/10.51.51.155:7337
> , creating a new one.*
> *19/08/29 20:18:49 INFO TransportClientFactory: Successfully created
> connection to ip-10-51-51-155.ec2.internal/10.51.51.155:7337
>  after 0 ms (0 ms spent in bootstraps)*
> *19/08/29 20:18:49 INFO TransportClientFactory: Found inactive connection
> to ip-10-51-51-142.ec2.internal/10.51.51.142:7337
> , creating a new one.*
> *19/08/29 20:18:49 INFO TransportClientFactory: Successfully created
> connection to ip-10-51-51-142.ec2.internal/10.51.51.142:7337
>  after 0 ms (0 ms spent in bootstraps)*
> *19/08/29 20:18:49 INFO TransportClientFactory: Found inactive connection
> to ip-10-51-51-140.ec2.internal/10.51.51.140:7337
> , creating a new one.*
> *19/08/29 20:18:49 INFO TransportClientFactory: Successfully created
> connection to ip-10-51-51-140.ec2.internal/10.51.51.140:7337
>  after 0 ms (0 ms spent in bootstraps)*
> *19/08/29 20:18:49 INFO TransportClientFactory: Found inactive connection
> to ip-10-51-51-157.ec2.internal/10.51.51.157:7337
> , creating a new one.*
> *19/08/29 20:18:49 INFO TransportClientFactory: Successfully created
> connection to ip-10-51-51-157.ec2.internal/10.51.51.157:7337
>  after 0 ms (0 ms spent in bootstraps)*
> *19/08/29 20:18:49 INFO ShuffleBlockFetcherIterator: Started 11 remote
> fetches in 61 ms*
> *19/08/29 20:28:55 INFO FileOutputCommitter: File Output Committer
> Algorithm version is 1*
> .
>
> The last log from the hanging executor on EMR:
> *19/08/29 19:40:40 INFO Executor: Finished task 78.0 in stage 2.0 (TID
> 91). 3854 bytes result sent to driver*
> *19/08/29 19:40:40 INFO CoarseGrainedExecutorBackend: Got assigned task
> 101*
> *19/08/29 19:40:40 INFO Executor: Running task 88.0 in 

Re: Stream is corrupted in ShuffleBlockFetcherIterator

2019-08-16 Thread Vadim Semenov
This is what you're looking for:

Handle large corrupt shuffle blocks
https://issues.apache.org/jira/browse/SPARK-26089

So until 3.0 the only way I can think of is to reduce the size/split your
job into many

On Thu, Aug 15, 2019 at 4:47 PM Mikhail Pryakhin 
wrote:

> Hello, Spark community!
>
> I've been struggling with my job which constantly fails due to inability
> to uncompress some previously compressed blocks while shuffling data.
> I use spark 2.2.0 with all the configuration settings left by default (no
> specific compression codec is specified). I've ascertained that
> LZ4CompressionCodec is used as a default codec. The job fails as soon as
> the limit of attempts exceeded with the following  message:
>
> Caused by: java.io.IOException: Stream is corrupted
> at
> org.apache.spark.io.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:211)
> at
> org.apache.spark.io.LZ4BlockInputStream.read(LZ4BlockInputStream.java:125)
> at
> org.apache.spark.io.LZ4BlockInputStream.read(LZ4BlockInputStream.java:137)
> at
> org.apache.spark.util.Utils$$anonfun$copyStream$1.apply$mcJ$sp(Utils.scala:340)
> at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327)
> at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
> at org.apache.spark.util.Utils$.copyStream(Utils.scala:348)
> at
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:395)
> ... 28 more
> Caused by: net.jpountz.lz4.LZ4Exception: Error decoding offset 14649 of
> input buffer
>
>
> Actually, I've stumbled upon a bug [1] as a not fixed yet. Any clue on how
> to workaround this issue?  I've tried the Snappy codec but it fails
> likewise with a bit different message)
>
> org.apache.spark.shuffle.FetchFailedException: failed to uncompress the
> chunk: FAILED_TO_UNCOMPRESS(5)
> at
> org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:442)
> at
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:403)
> at
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:59)
> at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
> at
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
> Source)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
> Source)
> at
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
> at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:211)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
> Source)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
> Source)
> at
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
> at org.apache.spark.scheduler.Task.run(Task.scala:108)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: failed to uncompress the chunk:
> FAILED_TO_UNCOMPRESS(5)
> at
> org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:361)
> at org.xerial.snappy.SnappyInputStream.rawRead(SnappyInputStream.java:158)
> at org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:142)
> at java.io.InputStream.read(InputStream.java:101)
> at
> org.apache.spark.util.Utils$$anonfun$copyStream$1.apply$mcJ$sp(Utils.scala:340)
> at 

Re: [Meta] Moderation request diversion?

2019-06-24 Thread Vadim Semenov
just set up a filter
[image: Screen Shot 2019-06-24 at 4.51.20 PM.png]

On Mon, Jun 24, 2019 at 4:46 PM Jeff Evans 
wrote:

> There seem to be a lot of people trying to unsubscribe via the main
> address, rather than following the instructions from the welcome
> email.  Of course, this is not all that surprising, but it leads to a
> lot of pointless threads*.  Is there a way to enable automatic
> detection and diversion of such requests?  I don't know what
> particular software is running this list, but mailman, for example,
> has such a capability.  I feel that such a thing would improve the
> signal-to-noise ratio of this valuable list.  Thanks.
>
> * see "unsubscribe" messages in the archive:
> http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 
Sent from my iPhone


Re: [Spark Core]: What is the release date for Spark 3 ?

2019-06-13 Thread Vadim Semenov
next spark summit

On Thu, Jun 13, 2019 at 3:58 AM Alex Dettinger 
wrote:

> Follow up on the release date for Spark 3. Any guesstimate or rough
> estimation without commitment would be helpful :)
>
> Cheers,
> Alex
>
> On Mon, Jun 10, 2019 at 5:24 PM Alex Dettinger 
> wrote:
>
>> Hi guys,
>>
>>   I was not able to find the foreseen release date for Spark 3.
>>   Would one have any information on this please ?
>>
>> Many thanks,
>> Alex
>>
>

-- 
Sent from my iPhone


Re: Difference between Checkpointing and Persist

2019-04-18 Thread Vadim Semenov
saving/checkpointing would be preferable in case of a big data set because:

- the RDD gets saved to HDFS and the DAG gets truncated so if some
partitions/executors fail it won't result in recomputing everything

- you don't use memory for caching therefore the JVM heap is going to be
smaller which helps GC and overall there'll be more memory for other
operations

- by saving to HDFS you're removing potential hotspots since partitions can
be fetched from many DataNodes vs when you get a hot partition that gets
requested a lot by other executors you may end up with an overwhelmed
executor

> We save that intermediate RDD and perform join (using same RDD - saving
is to just persist intermediate result before joining)
Checkpointing is essentially saving the RDD and reading it back, however
you can't read checkpointed data if the job failed so it'd be nice to have
one part of the join saved in case of potential issues.

Overall, in my opinion, when working with big joins you should pay more
attention to reliability and fault-tolerance rather than pure speed as the
probability of having issues grows with increasing the dataset size and
cluster size.

On Thu, Apr 18, 2019 at 1:49 PM Subash Prabakar 
wrote:

> Hi All,
>
> I have a doubt about checkpointing and persist/saving.
>
> Say we have one RDD - containing huge data,
> 1. We checkpoint and perform join
> 2. We persist as StorageLevel.MEMORY_AND_DISK and perform join
> 3. We save that intermediate RDD and perform join (using same RDD - saving
> is to just persist intermediate result before joining)
>
>
> Which of the above is faster and whats the difference?
>
>
> Thanks,
> Subash
>


-- 
Sent from my iPhone


Re: [SHUFFLE]FAILED_TO_UNCOMPRESS(5) errors when fetching shuffle data with sort-based shuffle

2019-03-12 Thread Vadim Semenov
I/We have seen this error before on 1.6 but ever since we upgraded to 2.1
two years ago we haven't seen it

On Tue, Mar 12, 2019 at 2:19 AM wangfei  wrote:

> Hi all,
>  Non-deterministic FAILED_TO_UNCOMPRESS(5) or ’Stream is corrupted’  
> errors
> may occur during shuffle read, described as this JIRA(
> https://issues.apache.org/jira/browse/SPARK-4105).
>  There is not new comment for a long time in this JIRA.  So,  Is there
> anyone seen these errors in latest version, such as spark-2.3?
>  Can anyone provide a reproducible case or  analyze the cause of these
> errors?
>  Thanks.
>


-- 
Sent from my iPhone


Re: Difference between dataset and dataframe

2019-02-19 Thread Vadim Semenov
>
> 1) Is there any difference in terms performance when we use datasets over
> dataframes? Is it significant to choose 1 over other. I do realise there
> would be some overhead due case classes but how significant is that? Are
> there any other implications.


As long as you use the DataFrame functions the performance is going to be
the same since they operate directly with Tungsten rows, but as soon as you
try to do any typed-operations like `.map` performance is going to be hit
because Spark would have to create Java objects from Tungsten memory.

2) Is the Tungsten code generation done only for datasets or is there any
> internal process to generate bytecode for dataframes as well? Since its
> related to jvm , I think its just for datasets but I couldn’t find anything
> that tells it specifically. If its just for datasets , does that mean we
> miss out on the project tungsten optimisation for dataframes?


Code generation is done for both



On Mon, Feb 18, 2019 at 9:09 PM Akhilanand  wrote:

>
> Hello,
>
> I have been recently exploring about dataset and dataframes. I would
> really appreciate if someone could answer these questions:
>
> 1) Is there any difference in terms performance when we use datasets over
> dataframes? Is it significant to choose 1 over other. I do realise there
> would be some overhead due case classes but how significant is that? Are
> there any other implications.
>
> 2) Is the Tungsten code generation done only for datasets or is there any
> internal process to generate bytecode for dataframes as well? Since its
> related to jvm , I think its just for datasets but I couldn’t find anything
> that tells it specifically. If its just for datasets , does that mean we
> miss out on the project tungsten optimisation for dataframes?
>
>
>
> Regards,
> Akhilanand BV
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 
Sent from my iPhone


Re: "where" clause able to access fields not in its schema

2019-02-13 Thread Vadim Semenov
Yeah, the filter gets infront of the select after analyzing

scala> b.where($"bar" === 20).explain(true)
== Parsed Logical Plan ==
'Filter ('bar = 20)
+- AnalysisBarrier
  +- Project [foo#6]
 +- Project [_1#3 AS foo#6, _2#4 AS bar#7]
+- SerializeFromObject [assertnotnull(assertnotnull(input[0,
scala.Tuple2, true]))._1 AS _1#3, assertnotnull(assertnotnull(input[0,
scala.Tuple2, true]))._2 AS _2#4]
   +- ExternalRDD [obj#2]

== Analyzed Logical Plan ==
foo: int
Project [foo#6]
+- Filter (bar#7 = 20)
   +- Project [foo#6, bar#7]
  +- Project [_1#3 AS foo#6, _2#4 AS bar#7]
 +- SerializeFromObject [assertnotnull(assertnotnull(input[0,
scala.Tuple2, true]))._1 AS _1#3, assertnotnull(assertnotnull(input[0,
scala.Tuple2, true]))._2 AS _2#4]
+- ExternalRDD [obj#2]

== Optimized Logical Plan ==
Project [_1#3 AS foo#6]
+- Filter (_2#4 = 20)
   +- SerializeFromObject [assertnotnull(input[0, scala.Tuple2, true])._1
AS _1#3, assertnotnull(input[0, scala.Tuple2, true])._2 AS _2#4]
  +- ExternalRDD [obj#2]

== Physical Plan ==
*(1) Project [_1#3 AS foo#6]
+- *(1) Filter (_2#4 = 20)
   +- *(1) SerializeFromObject [assertnotnull(input[0, scala.Tuple2,
true])._1 AS _1#3, assertnotnull(input[0, scala.Tuple2, true])._2 AS _2#4]
  +- Scan ExternalRDDScan[obj#2]

On Wed, Feb 13, 2019 at 8:04 PM Yeikel  wrote:

> This is indeed strange. To add to the question , I can see that if I use a
> filter I get an exception (as expected) , so I am not sure what's the
> difference between the  where clause and filter :
>
>
> b.filter(s=> {
> val bar : String = s.getAs("bar")
>
> bar.equals("20")
> }).show
>
> * java.lang.IllegalArgumentException: Field "bar" does not exist.*
>
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 
Sent from my iPhone


Re: Spark on YARN, HowTo kill executor or individual task?

2019-02-12 Thread Vadim Semenov
Yeah, then the easiest would be to fork spark and run using the forked
version, and in case of YARN it should be pretty easy to do.

git clone https://github.com/apache/spark.git

cd spark

export MAVEN_OPTS="-Xmx4g -XX:ReservedCodeCacheSize=512m"

./build/mvn -DskipTests clean package

./dev/make-distribution.sh --name custom-spark --tgz -Phadoop-2.7 -Phive
-Pyarn

ls -la spark-2.4.0-SNAPSHOT-bin-custom-spark.tgz

scp spark-2.4.0-SNAPSHOT-bin-custom-spark.tgz cluster:/tmp

export SPARK_HOME="/tmp/spark-2.3.0-SNAPSHOT-bin-custom-spark"

cd $SPARK_HOME
mv conf conf.new
ln -s /etc/spark/conf conf

echo $SPARK_HOME
spark-submit --version

On Tue, Feb 12, 2019 at 6:40 AM Serega Sheypak 
wrote:
>
> I tried a similar approach, it works well for user functions. but I need
to crash tasks or executor when spark application runs "repartition". I
didn't any away to inject "poison pill" into repartition call :(
>
> пн, 11 февр. 2019 г. в 21:19, Vadim Semenov :
>>
>> something like this
>>
>> import org.apache.spark.TaskContext
>> ds.map(r => {
>>   val taskContext = TaskContext.get()
>>   if (taskContext.partitionId == 1000) {
>> throw new RuntimeException
>>   }
>>   r
>> })
>>
>> On Mon, Feb 11, 2019 at 8:41 AM Serega Sheypak 
wrote:
>> >
>> > I need to crash task which does repartition.
>> >
>> > пн, 11 февр. 2019 г. в 10:37, Gabor Somogyi :
>> >>
>> >> What blocks you to put if conditions inside the mentioned map
function?
>> >>
>> >> On Mon, Feb 11, 2019 at 10:31 AM Serega Sheypak <
serega.shey...@gmail.com> wrote:
>> >>>
>> >>> Yeah, but I don't need to crash entire app, I want to fail several
tasks or executors and then wait for completion.
>> >>>
>> >>> вс, 10 февр. 2019 г. в 21:49, Gabor Somogyi <
gabor.g.somo...@gmail.com>:
>> >>>>
>> >>>> Another approach is adding artificial exception into the
application's source code like this:
>> >>>>
>> >>>> val query = input.toDS.map(_ /
0).writeStream.format("console").start()
>> >>>>
>> >>>> G
>> >>>>
>> >>>>
>> >>>> On Sun, Feb 10, 2019 at 9:36 PM Serega Sheypak <
serega.shey...@gmail.com> wrote:
>> >>>>>
>> >>>>> Hi BR,
>> >>>>> thanks for your reply. I want to mimic the issue and kill tasks at
a certain stage. Killing executor is also an option for me.
>> >>>>> I'm curious how do core spark contributors test spark fault
tolerance?
>> >>>>>
>> >>>>>
>> >>>>> вс, 10 февр. 2019 г. в 16:57, Gabor Somogyi <
gabor.g.somo...@gmail.com>:
>> >>>>>>
>> >>>>>> Hi Serega,
>> >>>>>>
>> >>>>>> If I understand your problem correctly you would like to kill one
executor only and the rest of the app has to be untouched.
>> >>>>>> If that's true yarn -kill is not what you want because it stops
the whole application.
>> >>>>>>
>> >>>>>> I've done similar thing when tested/testing Spark's HA features.
>> >>>>>> - jps -vlm | grep
"org.apache.spark.executor.CoarseGrainedExecutorBackend.*applicationid"
>> >>>>>> - kill -9 pidofoneexecutor
>> >>>>>>
>> >>>>>> Be aware if it's a multi-node cluster check whether at least one
process runs on a specific node(it's not required).
>> >>>>>> Happy killing...
>> >>>>>>
>> >>>>>> BR,
>> >>>>>> G
>> >>>>>>
>> >>>>>>
>> >>>>>> On Sun, Feb 10, 2019 at 4:19 PM Jörn Franke 
wrote:
>> >>>>>>>
>> >>>>>>> yarn application -kill applicationid ?
>> >>>>>>>
>> >>>>>>> > Am 10.02.2019 um 13:30 schrieb Serega Sheypak <
serega.shey...@gmail.com>:
>> >>>>>>> >
>> >>>>>>> > Hi there!
>> >>>>>>> > I have weird issue that appears only when tasks fail at
specific stage. I would like to imitate failure on my own.
>> >>>>>>> > The plan is to run problematic app and then kill entire
executor or some tasks when execution reaches certain stage.
>> >>>>>>> >
>> >>>>>>> > Is it do-able?
>> >>>>>>>
>> >>>>>>>
-
>> >>>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >>>>>>>
>>
>>
>> --
>> Sent from my iPhone



-- 
Sent from my iPhone


Re: Spark on YARN, HowTo kill executor or individual task?

2019-02-11 Thread Vadim Semenov
something like this

import org.apache.spark.TaskContext
ds.map(r => {
  val taskContext = TaskContext.get()
  if (taskContext.partitionId == 1000) {
throw new RuntimeException
  }
  r
})

On Mon, Feb 11, 2019 at 8:41 AM Serega Sheypak  wrote:
>
> I need to crash task which does repartition.
>
> пн, 11 февр. 2019 г. в 10:37, Gabor Somogyi :
>>
>> What blocks you to put if conditions inside the mentioned map function?
>>
>> On Mon, Feb 11, 2019 at 10:31 AM Serega Sheypak  
>> wrote:
>>>
>>> Yeah, but I don't need to crash entire app, I want to fail several tasks or 
>>> executors and then wait for completion.
>>>
>>> вс, 10 февр. 2019 г. в 21:49, Gabor Somogyi :

 Another approach is adding artificial exception into the application's 
 source code like this:

 val query = input.toDS.map(_ / 0).writeStream.format("console").start()

 G


 On Sun, Feb 10, 2019 at 9:36 PM Serega Sheypak  
 wrote:
>
> Hi BR,
> thanks for your reply. I want to mimic the issue and kill tasks at a 
> certain stage. Killing executor is also an option for me.
> I'm curious how do core spark contributors test spark fault tolerance?
>
>
> вс, 10 февр. 2019 г. в 16:57, Gabor Somogyi :
>>
>> Hi Serega,
>>
>> If I understand your problem correctly you would like to kill one 
>> executor only and the rest of the app has to be untouched.
>> If that's true yarn -kill is not what you want because it stops the 
>> whole application.
>>
>> I've done similar thing when tested/testing Spark's HA features.
>> - jps -vlm | grep 
>> "org.apache.spark.executor.CoarseGrainedExecutorBackend.*applicationid"
>> - kill -9 pidofoneexecutor
>>
>> Be aware if it's a multi-node cluster check whether at least one process 
>> runs on a specific node(it's not required).
>> Happy killing...
>>
>> BR,
>> G
>>
>>
>> On Sun, Feb 10, 2019 at 4:19 PM Jörn Franke  wrote:
>>>
>>> yarn application -kill applicationid ?
>>>
>>> > Am 10.02.2019 um 13:30 schrieb Serega Sheypak 
>>> > :
>>> >
>>> > Hi there!
>>> > I have weird issue that appears only when tasks fail at specific 
>>> > stage. I would like to imitate failure on my own.
>>> > The plan is to run problematic app and then kill entire executor or 
>>> > some tasks when execution reaches certain stage.
>>> >
>>> > Is it do-able?
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>


-- 
Sent from my iPhone

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Job hangs in blocked task in final parquet write stage

2018-11-27 Thread Vadim Semenov
Hey Conrad,

has it started happening recently?

We recently started having some sporadic problems with drivers on EMR
when it gets stuck, up until two weeks ago everything was fine.
We're trying to figure out with the EMR team where the issue is coming from.
On Tue, Nov 27, 2018 at 6:29 AM Conrad Lee  wrote:
>
> Dear spark community,
>
> I'm running spark 2.3.2 on EMR 5.19.0.  I've got a job that's hanging in the 
> final stage--the job usually works, but I see this hanging behavior in about 
> one out of 50 runs.
>
> The second-to-last stage sorts the dataframe, and the final stage writes the 
> dataframe to HDFS.
>
> Here you can see the executor logs, which indicate that it has finished 
> processing the task.
>
> Here you can see the thread dump from the executor that's hanging.  Here's 
> the text of the blocked thread.
>
> I tried to work around this problem by enabling speculation, but speculative 
> execution never takes place.  I don't know why.
>
> Can anyone here help me?
>
> Thanks,
> Conrad



-- 
Sent from my iPhone

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark DataSets and multiple write(.) calls

2018-11-19 Thread Vadim Semenov
You can use checkpointing, in this case Spark will write out an rdd to
whatever destination you specify, and then the RDD can be reused from the
checkpointed state avoiding recomputing.

On Mon, Nov 19, 2018 at 7:51 AM Dipl.-Inf. Rico Bergmann <
i...@ricobergmann.de> wrote:

> Thanks for your advise. But I'm using Batch processing. Does anyone have a
> solution for the batch processing case?
>
> Best,
>
> Rico.
>
> Am 19.11.2018 um 09:43 schrieb Magnus Nilsson:
>
>
> Magnus Nilsson
> 9:43 AM (0 minutes ago)
>
> to info
> I had the same requirements. As far as I know the only way is to extend
> the foreachwriter, cache the microbatch result and write to each output.
>
> https://docs.databricks.com/spark/latest/structured-streaming/foreach.html
>
> Unfortunately it seems as if you have to make a new connection "per batch"
> instead of creating one long lasting connections for the pipeline as such.
> Ie you might have to implement some sort of connection pooling by yourself
> depending on sink.
>
> Regards,
>
> Magnus
>
>
> On Mon, Nov 19, 2018 at 9:13 AM Dipl.-Inf. Rico Bergmann <
> i...@ricobergmann.de> wrote:
>
>> Hi!
>>
>> I have a SparkSQL programm, having one input and 6 ouputs (write). When
>> executing this programm every call to write(.) executes the plan. My
>> problem is, that I want all these writes to happen in parallel (inside
>> one execution plan), because all writes have a common and compute
>> intensive subpart, that can be shared by all plans. Is there a
>> possibility to do this? (Caching is not a solution because the input
>> dataset is way to large...)
>>
>> Hoping for advises ...
>>
>> Best, Rico B.
>>
>>
>> ---
>> Diese E-Mail wurde von Avast Antivirus-Software auf Viren geprüft.
>> https://www.avast.com/antivirus
>>
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>
>
> 
>  Virenfrei.
> www.avast.com
> 
> <#m_-7118895712672043959_m_6471921890789606388_DAB4FAD8-2DD7-40BB-A1B8-4E2AA1F9FDF2>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org



-- 
Sent from my iPhone


Re: Does Spark have a plan to move away from sun.misc.Unsafe?

2018-10-25 Thread Vadim Semenov
Here you go:
the umbrella ticket:
https://issues.apache.org/jira/browse/SPARK-24417

and the sun.misc.unsafe one
https://issues.apache.org/jira/browse/SPARK-24421
On Wed, Oct 24, 2018 at 8:08 PM kant kodali  wrote:
>
> Hi All,
>
> Does Spark have a plan to move away from sun.misc.Unsafe to VarHandles? I am 
> trying to find a JIRA issue for this?
>
> Thanks!



-- 
Sent from my iPhone

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Driver OutOfMemoryError in MapOutputTracker$.serializeMapStatuses for 40 TB shuffle.

2018-09-07 Thread Vadim Semenov
You have too many partitions, so when the driver is trying to gather
the status of all map outputs and send back to executors it chokes on
the size of the structure that needs to be GZipped, and since it's
bigger than 2GiB, it produces OOM.
On Fri, Sep 7, 2018 at 10:35 AM Harel Gliksman  wrote:
>
> Hi,
>
> We are running a Spark (2.3.1) job on an EMR cluster with 500 r3.2xlarge (60 
> GB, 8 vcores, 160 GB SSD ). Driver memory is set to 25GB.
>
> It processes ~40 TB of data using aggregateByKey in which we specify 
> numPartitions = 300,000.
> Map side tasks succeed, but reduce side tasks all fail.
>
> We notice the following driver error:
>
> 18/09/07 13:35:03 WARN Utils: Suppressing exception in finally: null
>
>  java.lang.OutOfMemoryError
>
> at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> at java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
> at java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
> at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
> at 
> java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
> at 
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
> at 
> java.io.ObjectOutputStream$BlockDataOutputStream.flush(ObjectOutputStream.java:1822)
> at java.io.ObjectOutputStream.flush(ObjectOutputStream.java:719)
> at java.io.ObjectOutputStream.close(ObjectOutputStream.java:740)
> at 
> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$2.apply$mcV$sp(MapOutputTracker.scala:790)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1389)
> at 
> org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:789)
> at 
> org.apache.spark.ShuffleStatus.serializedMapStatus(MapOutputTracker.scala:174)
> at 
> org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:397)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Exception in thread "map-output-dispatcher-0" java.lang.OutOfMemoryError
> at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> at java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
> at java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
> at java.util.zip.GZIPOutputStream.write(GZIPOutputStream.java:145)
> at 
> java.io.ObjectOutputStream$BlockDataOutputStream.writeBlockHeader(ObjectOutputStream.java:1894)
> at 
> java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1875)
> at 
> java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
> at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at 
> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply$mcV$sp(MapOutputTracker.scala:787)
> at 
> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:786)
> at 
> org.apache.spark.MapOutputTracker$$anonfun$serializeMapStatuses$1.apply(MapOutputTracker.scala:786)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1380)
> at 
> org.apache.spark.MapOutputTracker$.serializeMapStatuses(MapOutputTracker.scala:789)
> at 
> org.apache.spark.ShuffleStatus.serializedMapStatus(MapOutputTracker.scala:174)
> at 
> org.apache.spark.MapOutputTrackerMaster$MessageLoop.run(MapOutputTracker.scala:397)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Suppressed: java.lang.OutOfMemoryError
> at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
> at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
> at java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:253)
> at java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
> at 

Re: java.lang.IndexOutOfBoundsException: len is negative - when data size increases

2018-08-16 Thread Vadim Semenov
one of the spills becomes bigger than 2GiB and can't be loaded fully
(as arrays in Java can't have more than 2^32 values)

> 
> org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.loadNext(UnsafeSorterSpillReader.java:76)


You can try increasing the number of partitions, so spills would be
further smaller.

Also check if you have some skewness on the stage that precedes the
stage where it fails on

On Thu, Aug 16, 2018 at 11:25 AM Deepak Sharma  wrote:
>
> Hi All,
> I am running spark based ETL in spark 1.6  and facing this weird issue.
> The same code with same properties/configuration runs fine in other 
> environment E.g. PROD but never completes in CAT.
> The only change would be the size of data it is processing and that too be by 
> 1-2 GB.
> This is the stack trace:java.lang.IndexOutOfBoundsException: len is negative
> at org.spark-project.guava.io.ByteStreams.read(ByteStreams.java:895)
> at 
> org.spark-project.guava.io.ByteStreams.readFully(ByteStreams.java:733)
> at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.loadNext(UnsafeSorterSpillReader.java:76)
> at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter$SpillableIterator.loadNext(UnsafeExternalSorter.java:509)
> at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.next(UnsafeExternalRowSorter.java:136)
> at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.next(UnsafeExternalRowSorter.java:123)
> at 
> org.apache.spark.sql.execution.RowIteratorFromScala.advanceNext(RowIterator.scala:84)
> at 
> org.apache.spark.sql.execution.joins.SortMergeJoinScanner.advancedStreamed(SortMergeJoin.scala:272)
> at 
> org.apache.spark.sql.execution.joins.SortMergeJoinScanner.findNextOuterJoinRows(SortMergeJoin.scala:233)
> at 
> org.apache.spark.sql.execution.joins.OneSideOuterIterator.advanceStream(SortMergeOuterJoin.scala:250)
> at 
> org.apache.spark.sql.execution.joins.OneSideOuterIterator.advanceNext(SortMergeOuterJoin.scala:283)
> at 
> org.apache.spark.sql.execution.RowIteratorToScala.hasNext(RowIterator.scala:68)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at 
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192)
> at 
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:242)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:748)
>
> Did anyone faced this issue?
> If yes , what can i do to resolve this?
>
> Thanks
> Deepak



-- 
Sent from my iPhone

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: groupBy and then coalesce impacts shuffle partitions in unintended way

2018-08-08 Thread Vadim Semenov
`coalesce` sets the number of partitions for the last stage, so you
have to use `repartition` instead which is going to introduce an extra
shuffle stage

On Wed, Aug 8, 2018 at 3:47 PM Koert Kuipers  wrote:
>
> one small correction: lots of files leads to pressure on the spark driver 
> program when reading this data in spark.
>
> On Wed, Aug 8, 2018 at 3:39 PM, Koert Kuipers  wrote:
>>
>> hi,
>>
>> i am reading data from files into a dataframe, then doing a groupBy for a 
>> given column with a count, and finally i coalesce to a smaller number of 
>> partitions before writing out to disk. so roughly:
>>
>> spark.read.format(...).load(...).groupBy(column).count().coalesce(100).write.format(...).save(...)
>>
>> i have this setting: spark.sql.shuffle.partitions=2048
>>
>> i expect to see 2048 partitions in shuffle. what i am seeing instead is a 
>> shuffle with only 100 partitions. it's like the coalesce has taken over the 
>> partitioning of the groupBy.
>>
>> any idea why?
>>
>> i am doing coalesce because it is not helpful to write out 2048 files, lots 
>> of files leads to pressure down the line on executors reading this data (i 
>> am writing to just one partition of a larger dataset), and since i have less 
>> than 100 executors i expect it to be efficient. so sounds like a good idea, 
>> no?
>>
>> but i do need 2048 partitions in my shuffle due to the operation i am doing 
>> in the groupBy (in my real problem i am not just doing a count...).
>>
>> thanks!
>> koert
>>
>


-- 
Sent from my iPhone

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Broadcast variable size limit?

2018-08-05 Thread Vadim Semenov
That’s the max size of a byte array in Java, limited by the length which is
defined as integer, and in most JVMS arrays can’t hold more than
Int.MaxValue - 8 elements. Other way to overcome this is to create multiple
broadcast variables

On Sunday, August 5, 2018, klrmowse  wrote:

> i don't need more, per se... i just need to watch the size of the variable;
> then, if it's within the size limit, go ahead and broadcast it; if not,
> then
> i won't broadcast...
>
> so, that would be a yes then? (2 GB, or which is it exactly?)
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 
Sent from my iPhone


Re: How to Create one DB connection per executor and close it after the job is done?

2018-07-30 Thread Vadim Semenov
object MyDatabseSingleton {
@transient
lazy val dbConn = DB.connect(…)

`transient` marks the variable to be excluded from serialization

and `lazy` would open connection only when it's needed and also makes
sure that the val is thread-safe

http://fdahms.com/2015/10/14/scala-and-the-transient-lazy-val-pattern/
http://code-o-matic.blogspot.com/2009/05/double-checked-locking-idiom-sweet-in.html
On Mon, Jul 30, 2018 at 1:32 PM kant kodali  wrote:
>
> Hi Patrick,
>
> This object must be serializable right? I wonder if I will access to this 
> object in my driver(since it is getting created on the executor side) so I 
> can close when I am done with my batch?
>
> Thanks!
>
> On Mon, Jul 30, 2018 at 7:37 AM, Patrick McGloin  
> wrote:
>>
>> You could use an object in Scala, of which only one instance will be created 
>> on each JVM / Executor. E.g.
>>
>> object MyDatabseSingleton {
>> var dbConn = ???
>> }
>>
>> On Sat, 28 Jul 2018, 08:34 kant kodali,  wrote:
>>>
>>> Hi All,
>>>
>>> I understand creating a connection forEachPartition but I am wondering can 
>>> I create one DB connection per executor and close it after the job is done? 
>>> any sample code would help. you can imagine I am running a simple batch 
>>> processing application.
>>>
>>> Thanks!
>
>


-- 
Sent from my iPhone

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Iterative rdd union + reduceByKey operations on small dataset leads to "No space left on device" error on account of lot of shuffle spill.

2018-07-27 Thread Vadim Semenov
`spark.worker.cleanup.enabled=true` doesn't work for YARN.
On Fri, Jul 27, 2018 at 8:52 AM dineshdharme  wrote:
>
> I am trying to do few (union + reduceByKey) operations on a hiearchical
> dataset in a iterative fashion in rdd. The first few loops run fine but on
> the subsequent loops, the operations ends up using the whole scratch space
> provided to it.
>
> I have set the spark scratch directory, i.e. SPARK_LOCAL_DIRS , to be one
> having 100 GB space.
> The heirarchical dataset, whose size is (< 400kB), remains constant
> throughout the iterations.
> I have tried the worker cleanup flag but it has no effect i.e.
> "spark.worker.cleanup.enabled=true"
>
>
>
> Error :
> Caused by: java.io.IOException: No space left on device
> at java.io.FileOutputStream.writeBytes(Native Method)
> at java.io.FileOutputStream.write(FileOutputStream.java:326)
> at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
> at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
> at java.io.DataOutputStream.writeLong(DataOutputStream.java:224)
> at
> org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1$$anonfun$apply$mcV$sp$1.apply$mcVJ$sp(IndexShuffleBlockResolver.scala:151)
> at
> org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1$$anonfun$apply$mcV$sp$1.apply(IndexShuffleBlockResolver.scala:149)
> at
> org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1$$anonfun$apply$mcV$sp$1.apply(IndexShuffleBlockResolver.scala:149)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofLong.foreach(ArrayOps.scala:246)
> at
> org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1.apply$mcV$sp(IndexShuffleBlockResolver.scala:149)
> at
> org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1.apply(IndexShuffleBlockResolver.scala:145)
> at
> org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1.apply(IndexShuffleBlockResolver.scala:145)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
> at
> org.apache.spark.shuffle.IndexShuffleBlockResolver.writeIndexFileAndCommit(IndexShuffleBlockResolver.scala:153)
> at
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
> at org.apache.spark.scheduler.Task.run(Task.scala:109)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
>
>
> What I am trying to do (High Level):
>
> I have a dataset of 5 different csv ( Parent, Child1, Child2, Child21,
> Child22 ) which are related in a hierarchical fashion as shown below.
>
> Parent-> Child1 -> Child2  -> Child21
>
> Parent-> Child1 -> Child2  -> Child22
>
> Each element in the tree has 14 columns (elementid, parentelement_id, cat1,
> cat2, num1, num2,., num10)
>
> I am trying to aggregate the values of one column of Child21 into Child1
> (i.e. 2 levels up). I am doing the same for another column value of Child22
> into Child1. Then I am merging these aggregated values at the same Child1
> level.
>
> This is present in the code at location :
>
> spark.rddexample.dummyrdd.tree.child1.events.Function1
>
>
> Code which replicates the issue:
>
> 1] https://github.com/dineshdharme/SparkRddShuffleIssue
>
>
>
> Steps to reproduce the issue :
>
> 1] Clone the above repository.
>
> 2] Put the csvs in the "issue-data" folder in the above repository at a
> hadoop location "hdfs:///tree/dummy/data/"
>
> 3] Set the spark scratch directory (SPARK_LOCAL_DIRS) to a folder which has
> large space. (> 100 GB)
>
> 4] Run "sbt assembly"
>
> 5] Run the following command at the project location
>
> /path/to/spark-2.3.0-bin-hadoop2.7/bin/spark-submit \
> --class spark.rddexample.dummyrdd.FunctionExecutor \
> --master local[2] \
> --deploy-mode client \
> --executor-memory 2G \
> --driver-memory 2G \
> target/scala-2.11/rdd-shuffle-assembly-0.1.0.jar \
> 20 \
> hdfs:///tree/dummy/data/ \
> hdfs:///tree/dummy/results/
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>


-- 
Sent from my iPhone

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Dataframe joins - UnsupportedOperationException: Unimplemented type: IntegerType

2018-07-09 Thread Vadim Semenov
That usually happens when you have different types for a column in some
parquet files.
In this case, I think you have a column of `Long` type that got a file with
`Integer` type, I had to deal with similar problem once.
You would have to cast it yourself to Long.

On Mon, Jul 9, 2018 at 2:53 PM Nirav Patel  wrote:

> I am getting following error after performing joins between 2 dataframe.
> It happens on call to .show() method. I assume it's an issue with
> incompatible type but it's been really hard to identify which column of
> which dataframe have that incompatibility.
> Any pointers?
>
>
> 11:06:10.304 13700 [Executor task launch worker for task 16] WARN
>  o.a.s.s.e.datasources.FileScanRDD - Skipped the rest of the content in the
> corrupted file: path:
> maprfs:///user/hive/warehouse/analytics.db/myTable/BUSINESS_ID=123/part-0-b01dbc82-9bc3-43c5-89c6-4c9b2d407106.c000.snappy.parquet,
> range: 0-14248, partition values: [1085]
> java.lang.UnsupportedOperationException: Unimplemented type: IntegerType
> at
> org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBinaryBatch(VectorizedColumnReader.java:431)
> at
> org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:203)
> at
> org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:230)
> at
> org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:137)
> at
> org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.getNext(FileScanRDD.scala:154)
> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:105)
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:105)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown
> Source)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
> Source)
> at
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
> at
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
> at org.apache.spark.scheduler.Task.run(Task.scala:108)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
>
>
> [image: What's New with Xactly] 
>
> 
> 
>    
> 



-- 
Sent from my iPhone


Re: Dynamic allocation not releasing executors after unpersisting all cached data

2018-07-09 Thread Vadim Semenov
Try doing `unpersist(blocking=true)`
On Mon, Jul 9, 2018 at 2:59 PM Jeffrey Charles
 wrote:
>
> I'm persisting a dataframe in Zeppelin which has dynamic allocation enabled 
> to get a sense of how much memory the dataframe takes up. After I note the 
> size, I unpersist the dataframe. For some reason, Yarn is not releasing the 
> executors that were added to Zeppelin. If I don't run the persist and 
> unpersist steps, the executors that were added are removed about a minute 
> after the paragraphs complete. Looking at the storage tab in the Spark UI for 
> the Zeppelin job, I don't see anything cached.
>
> Is there any way to get Yarn to automatically remove executors after doing a 
> persist followed by an unpersist if there is no activity on the executor 
> within the configured dynamic allocation timeout (similar to how it works 
> without a persist/unpersist cycle) without having to set 
> spark.dynamicAllocation.cachedExecutorIdleTimeout? The main reason I'd like 
> to avoid setting that configuration is I do not want to the executors being 
> reclaimed if they do have cached data.



-- 
Sent from my iPhone

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Inferring Data driven Spark parameters

2018-07-03 Thread Vadim Semenov
You can't change the executor/driver cores/memory on the fly once
you've already started a Spark Context.
On Tue, Jul 3, 2018 at 4:30 AM Aakash Basu  wrote:
>
> We aren't using Oozie or similar, moreover, the end to end job shall be 
> exactly the same, but the data will be extremely different (number of 
> continuous and categorical columns, vertical size, horizontal size, etc), 
> hence, if there would have been a calculation of the parameters to arrive at 
> a conclusion that we can simply get the data and derive the respective 
> configuration/parameters, it would be great.
>
> On Tue, Jul 3, 2018 at 1:09 PM, Jörn Franke  wrote:
>>
>> Don’t do this in your job. Create for different types of jobs different jobs 
>> and orchestrate them using oozie or similar.
>>
>> On 3. Jul 2018, at 09:34, Aakash Basu  wrote:
>>
>> Hi,
>>
>> Cluster - 5 node (1 Driver and 4 workers)
>> Driver Config: 16 cores, 32 GB RAM
>> Worker Config: 8 cores, 16 GB RAM
>>
>> I'm using the below parameters from which I know the first chunk is cluster 
>> dependent and the second chunk is data/code dependent.
>>
>> --num-executors 4
>> --executor-cores 5
>> --executor-memory 10G
>> --driver-cores 5
>> --driver-memory 25G
>>
>>
>> --conf spark.sql.shuffle.partitions=100
>> --conf spark.driver.maxResultSize=2G
>> --conf "spark.executor.extraJavaOptions=-XX:+UseParallelGC"
>> --conf spark.scheduler.listenerbus.eventqueue.capacity=2
>>
>> I've come upto these values depending on my R on the properties and the 
>> issues I faced and hence the handles.
>>
>> My ask here is -
>>
>> 1) How can I infer, using some formula or a code, to calculate the below 
>> chunk dependent on the data/code?
>> 2) What are the other usable properties/configurations which I can use to 
>> shorten my job runtime?
>>
>> Thanks,
>> Aakash.
>
>


-- 
Sent from my iPhone

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [G1GC] -XX: -ResizePLAB How to provide in Spark Submit

2018-07-03 Thread Vadim Semenov
As typical `JAVA_OPTS` you need to pass as a single parameter:

--conf "spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:-ResizePLAB"

Also you got an extra space in the parameter, there should be no space
after the colon symbol
On Tue, Jul 3, 2018 at 3:01 AM Aakash Basu  wrote:
>
> Hi,
>
> I used the below in the Spark Submit for using G1GC -
>
> --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC"
>
> Now, I want to use -XX: -ResizePLAB of the G1GC to control to avoid 
> performance degradation caused by a large number of thread communications.
>
> How to do it? I tried submitting in the similar fashion -
>
> --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC" --conf 
> "spark.executor.extraJavaOptions=-XX: -ResizePLAB", but it doesn't work.
>
> Thanks,
> Aakash.



-- 
Sent from my iPhone

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Can we get the partition Index in an UDF

2018-06-25 Thread Vadim Semenov
Try using `TaskContext`:

import org.apache.spark.TaskContext
val partitionId = TaskContext.getPartitionId()

On Mon, Jun 25, 2018 at 11:17 AM Lalwani, Jayesh
 wrote:
>
> We are trying to add a column to a Dataframe with some data that is seeded by 
> some random data. We want to be able to control the seed, so multiple runs of 
> the same transformation generate the same output. We also want to generate 
> different random numbers for each partition
>
>
>
> This is easy to do with mapPartitionsWithIndex. For each partition, we 
> generate a Random number generator that is seeded with a global seed + index 
> of partition. The problem with this is mapPartitionsWithIndex is a blackbox, 
> and any filter predicates that are added after mapPartitionsWithIndex don’t 
> get pushed down to source
>
>
>
> If we implement this function as an UDF, we can get the filters pushed down 
> to the source, but we don’t have the partition index.
>
>
>
> Yes, I know we could use the mapPartitionsWithIndex after the filter. That is 
> what we will probably end up doing. I was wondering if there is a way of 
> implementing this without having to move the filter around.
>
>
> 
>
> The information contained in this e-mail is confidential and/or proprietary 
> to Capital One and/or its affiliates and may only be used solely in 
> performance of work or services for Capital One. The information transmitted 
> herewith is intended only for use by the individual or entity to which it is 
> addressed. If the reader of this message is not the intended recipient, you 
> are hereby notified that any review, retransmission, dissemination, 
> distribution, copying or other use of, or taking of any action in reliance 
> upon this information is strictly prohibited. If you have received this 
> communication in error, please contact the sender and delete the material 
> from your computer.



-- 
Sent from my iPhone

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Writing rows directly in Tungsten format into memory

2018-06-12 Thread Vadim Semenov
Is there a way to write rows directly into off-heap memory in the Tungsten
format bypassing creating objects?

I have a lot of rows, and right now I'm creating objects, and they get
encoded, but because of the number of rows, it creates significant pressure
on GC. I'd like to avoid creating objects at all, and since I know the
schema, I was wondering if I could just somehow write rows directly into
off-heap?

Thank you.


Re: Time series data

2018-05-24 Thread Vadim Semenov
Yeah, it depends on what you want to do with that timeseries data. We at
Datadog process trillions of points daily using Spark, I cannot really go
about what exactly we do with the data, but just saying that Spark can
handle the volume, scale well and be fault-tolerant, albeit everything I
said comes with multiple asterisks.

On Thursday, May 24, 2018, amin mohebbi  wrote:

> Could you please help me to understand  the performance that we get from
> using spark with any nosql or TSDB ? We receive 1 mil meters x 288 readings
> = 288 mil rows (Approx. 360 GB per day) – Therefore, we will end up with
> 10's or 100's of TBs of data and I feel that NoSQL will be much quicker
> than Hadoop/Spark. This is time series data that are coming from many
> devices in form of flat files and it is currently extracted / transformed 
> /loaded
> into another database which is connected to BI tools. We might use azure
> data factory to collect the flat files and then use spark to do the ETL(not
> sure if it is correct way) and then use spark to join table or do the
> aggregations and save them into a db (preferably nosql not sure).
> Finally, connect deploy Power BI to get visualize the data from nosql db.
> My questions are :
>
> 1- Is the above mentioned correct architecture? using spark with nosql as
> I think combination of these two could help to have random access and run
> many queries by different users.
> 2- do we really need to use a time series db?
>
>
> Best Regards ... Amin
> Mohebbi PhD candidate in Software Engineering   at university of Malaysia
> Tel : +60 18 2040 017 E-Mail : tp025...@ex.apiit.edu.my
> amin_...@me.com
>


-- 
Sent from my iPhone


Re:

2018-05-16 Thread Vadim Semenov
Upon downsizing to 20 partitions some of your partitions become too big,
and I see that you're doing caching, and executors try to write big
partitions to disk, but fail because they exceed 2GiB

> Caused by: java.lang.IllegalArgumentException: Size exceeds
Integer.MAX_VALUE
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869)
at org.apache.spark.storage.DiskStore$$anonfun$getBytes$4.
apply(DiskStore.scala:125)
at org.apache.spark.storage.DiskStore$$anonfun$getBytes$4.
apply(DiskStore.scala:124)

You can try to coalesce to 100 and reduce the number of executors to keep
the load on MySQL reasonable

On Wed, May 16, 2018 at 5:36 AM, Davide Brambilla <
davide.brambi...@contentwise.tv> wrote:

> Hi all,
>we have a dataframe with 1000 partitions and we need to write the
> dataframe into a MySQL using this command:
>
> df.coalesce(20)
> df.write.jdbc(url=url,
>   table=table,
>   mode=mode,
>   properties=properties)
>
> and we get this errors randomly
>
> java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
> at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869)
> at org.apache.spark.storage.DiskStore$$anonfun$getBytes$4.
> apply(DiskStore.scala:125)
> at org.apache.spark.storage.DiskStore$$anonfun$getBytes$4.
> apply(DiskStore.scala:124)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:126)
> at org.apache.spark.storage.BlockManager.getLocalValues(
> BlockManager.scala:520)
> at org.apache.spark.storage.BlockManager.get(BlockManager.scala:693)
> at org.apache.spark.storage.BlockManager.getOrElseUpdate(
> BlockManager.scala:753)
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:96)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:53)
> at org.apache.spark.scheduler.Task.run(Task.scala:108)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
>
> Driver stacktrace:
> at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$
> scheduler$DAGScheduler$$failJobAndIndependentStages(
> DAGScheduler.scala:1690)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(
> DAGScheduler.scala:1678)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(
> DAGScheduler.scala:1677)
> at scala.collection.mutable.ResizableArray$class.foreach(
> ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at org.apache.spark.scheduler.DAGScheduler.abortStage(
> DAGScheduler.scala:1677)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$
> handleTaskSetFailed$1.apply(DAGScheduler.scala:855)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$
> handleTaskSetFailed$1.apply(DAGScheduler.scala:855)
> at scala.Option.foreach(Option.scala:257)
> at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(
> DAGScheduler.scala:855)
> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
> doOnReceive(DAGScheduler.scala:1905)
> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
> onReceive(DAGScheduler.scala:1860)
> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
> onReceive(DAGScheduler.scala:1849)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:671)
> at 

Re: Free Column Reference with $

2018-05-04 Thread Vadim Semenov
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/SQLImplicits.scala#L38-L47

It's called String Interpolation
See "Advanced Usage" here
https://docs.scala-lang.org/overviews/core/string-interpolation.html

On Fri, May 4, 2018 at 10:10 AM, Christopher Piggott 
wrote:

> How does $"something" actually work (from a scala perspective) as a free
> column reference?
>
>


-- 
Sent from my iPhone


Re: spark.executor.extraJavaOptions inside application code

2018-05-02 Thread Vadim Semenov
You need to pass config before creating a session

val conf = new SparkConf()
// All three methods below are equivalent
conf.set("spark.executor.extraJavaOptions", "-Dbasicauth=myuser:mypassword")
conf.set("spark.executorEnv.basicauth", "myuser:mypassword")
conf.setExecutorEnv("basicauth", "myuser:mypassword")
val spark = SparkSession.builder().config(conf).appName("…").getOrCreate()


On Wed, May 2, 2018 at 6:59 AM, Agostino Calamita <
agostino.calam...@gmail.com> wrote:

> Hi all,
> I wrote an application that needs an environment variable. I can set this
> variable with
>
> --conf 'spark.executor.extraJavaOptions=-Dbasicauth=myuser:mypwd'
>
> in spark-submit and it works well in standalone cluster mode.
>
> But, I want set it inside the application code, because the variable
> contains a password.
>
> How can I do ?
>
> I tried with:
>
> SparkSession spark = SparkSession
>   .builder()
>   .appName("Java Spark Solr ETL")
>   .getOrCreate();
>
> 
> spark.sparkContext().conf().setExecutorEnv("spark.executor.extraJavaOptions",
> "-Dbasicauth=myuser:mypassword");
>
> but it doesn't work.
>
> Thanks.
>



-- 
Sent from my iPhone


Re: [Spark 2.x Core] .collect() size limit

2018-04-30 Thread Vadim Semenov
`.collect` returns an Array, and array's can't have more than Int.MaxValue
elements, and in most JVMs it's lower: `Int.MaxValue - 8`
So it puts upper limit, however, you can just create Array of Arrays, and
so on, basically limitless, albeit with some gymnastics.


Re: Tuning Resource Allocation during runtime

2018-04-27 Thread Vadim Semenov
You can not change dynamically the number of cores per executor or cores
per task, but you can change the number of executors.

In one of my jobs I have something like this, so when I know that I don't
need more than 4 executors, I kill all other executors (assuming that they
don't hold any cached data), and they join other jobs (thanks to dynamic
allocation)


// At this point we have 1500 parquet files
// But we want 100 files, which means about 4 executors can process
everything
// assuming that they can process 30 tasks each
// So we can let other executors leave the job
val executors = SparkContextUtil.getExecutorIds(sc)
executors.take(executors.size - 4).foreach(sc.killExecutor)


package org.apache.spark
/**
* `SparkContextUtil` gives access to private methods
*/
object SparkContextUtil {
def getExecutorIds(sc: SparkContext): Seq[String] =
sc.getExecutorIds.filter(_ != SparkContext.DRIVER_IDENTIFIER)




On Fri, Apr 27, 2018 at 3:52 AM, Donni Khan 
wrote:

> Hi All,
>
> Is there any way to change the  number of executors/cores  during running
> Saprk Job.
> I have Spark Job containing two tasks: First task need many executors to
> run fastly. the second task has many input and output opeartions and
> shuffling, so it needs  few executors, otherwise it taks loong time to
> finish.
> Does anyone knows if that possible in YARN?
>
>
> Thank you.
> Donni
>



-- 
Sent from my iPhone


Re: Spark Job Server application compilation issue

2018-03-14 Thread Vadim Semenov
This question should be directed to the `spark-jobserver` group:
https://github.com/spark-jobserver/spark-jobserver#contact

They also have a gitter chat.

Also include the errors you get once you're going to be asking them a
question

On Wed, Mar 14, 2018 at 1:37 PM, sujeet jog  wrote:

>
> Input is a json request, which would be decoded in myJob() & processed
> further.
>
> Not sure what is wrong with below code, it emits errors as unimplemented
> methods (runJob/validate),
> any pointers on this would be helpful,
>
> jobserver-0.8.0
>
> object MyJobServer extends SparkSessionJob {
>
>   type JobData = String
>   type JobOutput = Seq[String]
>
>   def myJob(a : String)  = {
> }
>
>   def runJob(sc: SparkContext, runtime: JobEnvironment, data: JobData):
> JobOutput = {
>myJob(a)
>}
>
>  def validate(sc: SparkContext, runtime: JobEnvironment, config: Config):
> JobData Or Every[ValidationProblem] = {
>Good(config.root().render())
>  }
>
>


-- 
Sent from my iPhone


Re: Writing a DataFrame is taking too long and huge space

2018-03-09 Thread Vadim Semenov
But overall, I think the original approach is not correct.
If you get a single file in 10s GB, the approach is probably must be
reworked.

I don't see why you can't just write multiple CSV files using Spark, and
then concatenate them without Spark

On Fri, Mar 9, 2018 at 10:02 AM, Vadim Semenov <va...@datadoghq.com> wrote:

> You can use `.checkpoint` for that
>
> `df.sort(…).coalesce(1).write...` — `coalesce` will make `sort` to have
> only one partition, so sorting will take a lot of time
>
> `df.sort(…).repartition(1).write...` — `repartition` will add an explicit
> stage, but sorting will be lost, since it's a repartition
>
> ```
> sc.setCheckpointDir("/tmp/test")
> val checkpointedDf = df.sort(…).checkpoint(eager=true) // will save all
> partitions
> checkpointedDf.coalesce(1).write.csv(…) // will load checkpointed
> partitions in one task, concatenate them, and will write them out as a
> single file
> ```
>
> On Fri, Mar 9, 2018 at 9:47 AM, Deepak Sharma <deepakmc...@gmail.com>
> wrote:
>
>> I would suggest repartioning it to reasonable partitions  may ne 500 and
>> save it to some intermediate working directory .
>> Finally read all the files from this working dir and then coalesce as 1
>> and save to final location.
>>
>> Thanks
>> Deepak
>>
>> On Fri, Mar 9, 2018, 20:12 Vadim Semenov <va...@datadoghq.com> wrote:
>>
>>> because `coalesce` gets propagated further up in the DAG in the last
>>> stage, so your last stage only has one task.
>>>
>>> You need to break your DAG so your expensive operations would be in a
>>> previous stage before the stage with `.coalesce(1)`
>>>
>>> On Fri, Mar 9, 2018 at 5:23 AM, Md. Rezaul Karim <
>>> rezaul.ka...@insight-centre.org> wrote:
>>>
>>>> Dear All,
>>>>
>>>> I have a tiny CSV file, which is around 250MB. There are only 30
>>>> columns in the DataFrame. Now I'm trying to save the pre-processed
>>>> DataFrame as an another CSV file on disk for later usage.
>>>>
>>>> However, I'm getting pissed off as writing the resultant DataFrame is
>>>> taking too long, which is about 4 to 5 hours. Nevertheless, the size of the
>>>> file written on the disk is about 58GB!
>>>>
>>>> Here's the sample code that I tried:
>>>>
>>>> # Using repartition()
>>>> myDF.repartition(1).write.format("com.databricks.spark.csv")
>>>> .save("data/file.csv")
>>>>
>>>> # Using coalesce()
>>>> myDF. coalesce(1).write.format("com.databricks.spark.csv").save("d
>>>> ata/file.csv")
>>>>
>>>>
>>>> Any better suggestion?
>>>>
>>>>
>>>>
>>>> 
>>>> Md. Rezaul Karim, BSc, MSc
>>>> Research Scientist, Fraunhofer FIT, Germany
>>>>
>>>> Ph.D. Researcher, Information Systems, RWTH Aachen University, Germany
>>>>
>>>> eMail: rezaul.ka...@fit.fraunhofer.de
>>>> <andrea.berna...@fit.fraunhofer.de>
>>>> Tel: +49 241 80-21527 <+49%20241%208021527>
>>>>
>>>
>>>
>>>
>>> --
>>> Sent from my iPhone
>>>
>>
>
>
> --
> Sent from my iPhone
>



-- 
Sent from my iPhone


Re: Writing a DataFrame is taking too long and huge space

2018-03-09 Thread Vadim Semenov
You can use `.checkpoint` for that

`df.sort(…).coalesce(1).write...` — `coalesce` will make `sort` to have
only one partition, so sorting will take a lot of time

`df.sort(…).repartition(1).write...` — `repartition` will add an explicit
stage, but sorting will be lost, since it's a repartition

```
sc.setCheckpointDir("/tmp/test")
val checkpointedDf = df.sort(…).checkpoint(eager=true) // will save all
partitions
checkpointedDf.coalesce(1).write.csv(…) // will load checkpointed
partitions in one task, concatenate them, and will write them out as a
single file
```

On Fri, Mar 9, 2018 at 9:47 AM, Deepak Sharma <deepakmc...@gmail.com> wrote:

> I would suggest repartioning it to reasonable partitions  may ne 500 and
> save it to some intermediate working directory .
> Finally read all the files from this working dir and then coalesce as 1
> and save to final location.
>
> Thanks
> Deepak
>
> On Fri, Mar 9, 2018, 20:12 Vadim Semenov <va...@datadoghq.com> wrote:
>
>> because `coalesce` gets propagated further up in the DAG in the last
>> stage, so your last stage only has one task.
>>
>> You need to break your DAG so your expensive operations would be in a
>> previous stage before the stage with `.coalesce(1)`
>>
>> On Fri, Mar 9, 2018 at 5:23 AM, Md. Rezaul Karim <
>> rezaul.ka...@insight-centre.org> wrote:
>>
>>> Dear All,
>>>
>>> I have a tiny CSV file, which is around 250MB. There are only 30 columns
>>> in the DataFrame. Now I'm trying to save the pre-processed DataFrame as an
>>> another CSV file on disk for later usage.
>>>
>>> However, I'm getting pissed off as writing the resultant DataFrame is
>>> taking too long, which is about 4 to 5 hours. Nevertheless, the size of the
>>> file written on the disk is about 58GB!
>>>
>>> Here's the sample code that I tried:
>>>
>>> # Using repartition()
>>> myDF.repartition(1).write.format("com.databricks.spark.
>>> csv").save("data/file.csv")
>>>
>>> # Using coalesce()
>>> myDF. coalesce(1).write.format("com.databricks.spark.csv").save("
>>> data/file.csv")
>>>
>>>
>>> Any better suggestion?
>>>
>>>
>>>
>>> 
>>> Md. Rezaul Karim, BSc, MSc
>>> Research Scientist, Fraunhofer FIT, Germany
>>>
>>> Ph.D. Researcher, Information Systems, RWTH Aachen University, Germany
>>>
>>> eMail: rezaul.ka...@fit.fraunhofer.de
>>> <andrea.berna...@fit.fraunhofer.de>
>>> Tel: +49 241 80-21527 <+49%20241%208021527>
>>>
>>
>>
>>
>> --
>> Sent from my iPhone
>>
>


-- 
Sent from my iPhone


Re: Writing a DataFrame is taking too long and huge space

2018-03-09 Thread Vadim Semenov
because `coalesce` gets propagated further up in the DAG in the last stage,
so your last stage only has one task.

You need to break your DAG so your expensive operations would be in a
previous stage before the stage with `.coalesce(1)`

On Fri, Mar 9, 2018 at 5:23 AM, Md. Rezaul Karim <
rezaul.ka...@insight-centre.org> wrote:

> Dear All,
>
> I have a tiny CSV file, which is around 250MB. There are only 30 columns
> in the DataFrame. Now I'm trying to save the pre-processed DataFrame as an
> another CSV file on disk for later usage.
>
> However, I'm getting pissed off as writing the resultant DataFrame is
> taking too long, which is about 4 to 5 hours. Nevertheless, the size of the
> file written on the disk is about 58GB!
>
> Here's the sample code that I tried:
>
> # Using repartition()
> myDF.repartition(1).write.format("com.databricks.spark.
> csv").save("data/file.csv")
>
> # Using coalesce()
> myDF. coalesce(1).write.format("com.databricks.spark.csv").save("
> data/file.csv")
>
>
> Any better suggestion?
>
>
>
> 
> Md. Rezaul Karim, BSc, MSc
> Research Scientist, Fraunhofer FIT, Germany
>
> Ph.D. Researcher, Information Systems, RWTH Aachen University, Germany
>
> eMail: rezaul.ka...@fit.fraunhofer.de 
> Tel: +49 241 80-21527 <+49%20241%208021527>
>



-- 
Sent from my iPhone


Re: Spark & S3 - Introducing random values into key names

2018-03-08 Thread Vadim Semenov
You need to put randomness into the beginning of the key, if you put it
other than into the beginning, it's not guaranteed that you're going to
have good performance.

The way we achieved this is by writing to HDFS first, and then having a
custom DistCp implemented using Spark that copies parquet files using
random keys,
and then saves the list of resulting keys to S3, and when we want to use
those parquet files, we just need to load the listing file, and then take
keys from it and pass them into the loader.

You only need to do this when you have way too many files, if the number of
keys you operate is reasonably small (let's say, in thousands), you won't
get any benefits.

Also the S3 buckets have internal optimizations, and overtime it adjusts to
the workload, i.e. some additional underlying partitions are getting added,
some splits happen, etc.
If you want to have good performance from start, you would need to use
randomization, yes.
Or alternatively, you can contact AWS and tell them about the naming schema
that you're going to have (but it must be set in stone), and then they can
try to pre-optimize the bucket for you.

On Thu, Mar 8, 2018 at 11:42 AM, Subhash Sriram 
wrote:

> Hey Spark user community,
>
> I am writing Parquet files from Spark to S3 using S3a. I was reading this
> article about improving S3 bucket performance, specifically about how it
> can help to introduce randomness to your key names so that data is written
> to different partitions.
>
> https://aws.amazon.com/premiumsupport/knowledge-
> center/s3-bucket-performance-improve/
>
> Is there a straight forward way to accomplish this randomness in Spark via
> the DataSet API? The only thing that I could think of would be to actually
> split the large set into multiple sets (based on row boundaries), and then
> write each one with the random key name.
>
> Is there an easier way that I am missing?
>
> Thanks in advance!
> Subhash
>
>
>


Re: OutOfDirectMemoryError for Spark 2.2

2018-03-06 Thread Vadim Semenov
Do you have a trace? i.e. what's the source of `io.netty.*` calls?

And have you tried bumping `-XX:MaxDirectMemorySize`?

On Tue, Mar 6, 2018 at 12:45 AM, Chawla,Sumit 
wrote:

> Hi All
>
> I have a job which processes a large dataset.  All items in the dataset
> are unrelated.  To save on cluster resources,  I process these items in
> chunks.  Since chunks are independent of each other,  I start and shut down
> the spark context for each chunk.  This allows me to keep DAG smaller and
> not retry the entire DAG in case of failures.   This mechanism used to work
> fine with Spark 1.6.  Now,  as we have moved to 2.2,  the job started
> failing with OutOfDirectMemoryError error.
>
> 2018-03-03 22:00:59,687 WARN  [rpc-server-48-1]
> server.TransportChannelHandler 
> (TransportChannelHandler.java:exceptionCaught(78))
> - Exception in connection from /10.66.73.27:60374
>
> io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 8388608
> byte(s) of direct memory (used: 1023410176, max: 1029177344)
>
> at io.netty.util.internal.PlatformDependent.incrementMemoryCounter(
> PlatformDependent.java:506)
>
> at io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(
> PlatformDependent.java:460)
>
> at io.netty.buffer.PoolArena$DirectArena.allocateDirect(
> PoolArena.java:701)
>
> at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:690)
>
> at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:237)
>
> at io.netty.buffer.PoolArena.allocate(PoolArena.java:213)
>
> at io.netty.buffer.PoolArena.allocate(PoolArena.java:141)
>
> at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(
> PooledByteBufAllocator.java:271)
>
> at io.netty.buffer.AbstractByteBufAllocator.directBuffer(
> AbstractByteBufAllocator.java:177)
>
> at io.netty.buffer.AbstractByteBufAllocator.directBuffer(
> AbstractByteBufAllocator.java:168)
>
> at io.netty.buffer.AbstractByteBufAllocator.ioBuffer(
> AbstractByteBufAllocator.java:129)
>
> at io.netty.channel.AdaptiveRecvByteBufAllocator$HandleImpl.allocate(
> AdaptiveRecvByteBufAllocator.java:104)
>
> at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(
> AbstractNioByteChannel.java:117)
>
> at io.netty.channel.nio.NioEventLoop.processSelectedKey(
> NioEventLoop.java:564)
>
> I got some clue on what is causing this from https://github.com/netty/
> netty/issues/6343,  However I am not able to add up numbers on what is
> causing 1 GB of Direct Memory to fill up.
>
> Output from jmap
>
>
> 7: 22230 1422720 io.netty.buffer.PoolSubpage
>
> 12: 1370 804640 io.netty.buffer.PoolSubpage[]
>
> 41: 3600 144000 io.netty.buffer.PoolChunkList
>
> 98: 1440 46080 io.netty.buffer.PoolThreadCache$SubPageMemoryRegionCache
>
> 113: 300 40800 io.netty.buffer.PoolArena$HeapArena
>
> 114: 300 40800 io.netty.buffer.PoolArena$DirectArena
>
> 192: 198 15840 io.netty.buffer.PoolChunk
>
> 274: 120 8320 io.netty.buffer.PoolThreadCache$MemoryRegionCache[]
>
> 406: 120 3840 io.netty.buffer.PoolThreadCache$NormalMemoryRegionCache
>
> 422: 72 3552 io.netty.buffer.PoolArena[]
>
> 458: 30 2640 io.netty.buffer.PooledUnsafeDirectByteBuf
>
> 500: 36 2016 io.netty.buffer.PooledByteBufAllocator
>
> 529: 32 1792 io.netty.buffer.UnpooledUnsafeHeapByteBuf
>
> 589: 20 1440 io.netty.buffer.PoolThreadCache
>
> 630: 37 1184 io.netty.buffer.EmptyByteBuf
>
> 703: 36 864 io.netty.buffer.PooledByteBufAllocator$PoolThreadLocalCache
>
> 852: 22 528 io.netty.buffer.AdvancedLeakAwareByteBuf
>
> 889: 10 480 io.netty.buffer.SlicedAbstractByteBuf
>
> 917: 8 448 io.netty.buffer.UnpooledHeapByteBuf
>
> 1018: 20 320 io.netty.buffer.PoolThreadCache$1
>
> 1305: 4 128 io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry
>
> 1404: 1 80 io.netty.buffer.PooledUnsafeHeapByteBuf
>
> 1473: 3 72 io.netty.buffer.PoolArena$SizeClass
>
> 1529: 1 64 io.netty.buffer.AdvancedLeakAwareCompositeByteBuf
>
> 1541: 2 64 io.netty.buffer.CompositeByteBuf$Component
>
> 1568: 1 56 io.netty.buffer.CompositeByteBuf
>
> 1896: 1 32 io.netty.buffer.PoolArena$SizeClass[]
>
> 2042: 1 24 io.netty.buffer.PooledUnsafeDirectByteBuf$1
>
> 2046: 1 24 io.netty.buffer.UnpooledByteBufAllocator
>
> 2051: 1 24 io.netty.buffer.PoolThreadCache$MemoryRegionCache$1
>
> 2078: 1 24 io.netty.buffer.PooledHeapByteBuf$1
>
> 2135: 1 24 io.netty.buffer.PooledUnsafeHeapByteBuf$1
>
> 2302: 1 16 io.netty.buffer.ByteBufUtil$1
>
> 2769: 1 16 io.netty.util.internal.__matchers__.io.netty.buffer.
> ByteBufMatcher
>
>
>
> My Driver machine has 32 CPUs,  and as of now i have 15 machines in my
> cluster.   As of now, the error happens on processing 5th or 6th chunk.  I
> suspect the error is dependent on number of Executors and would happen
> early if we add more executors.
>
>
> I am trying to come up an explanation of what is filling up the Direct
> Memory and how to quanitfy it as factor of Number of Executors.  Our
> cluster is shared cluster,  And we need to understand how much Driver
> Memory to allocate for most of the jobs.
>
>
>
>
>
> 

Re: Can I get my custom spark strategy to run last?

2018-03-02 Thread Vadim Semenov
Something like this?

sparkSession.experimental.extraStrategies = Seq(Strategy)

val logicalPlan = df.logicalPlan
val newPlan: LogicalPlan = Strategy(logicalPlan)

Dataset.ofRows(sparkSession, newPlan)


On Thu, Mar 1, 2018 at 8:20 PM, Keith Chapman 
wrote:

> Hi,
>
> I'd like to write a custom Spark strategy that runs after all the existing
> Spark strategies are run. Looking through the Spark code it seems like the
> custom strategies are prepended to the list of strategies in Spark. Is
> there a way I could get it to run last?
>
> Regards,
> Keith.
>
> http://keith-chapman.com
>


Re: org.apache.spark.SparkException: Task failed while writing rows

2018-02-28 Thread Vadim Semenov
Yeah, without actually seeing what's happening on that line, it'd be
difficult to say for sure.

You can check what patches HortonWorks applied, or/and ask them.

And yeah, seg fault is totally possible on any size of the data. But you
should've seen it in the `stdout` (assuming that the regular logs go to
`stderr`)

On Wed, Feb 28, 2018 at 2:53 PM, unk1102  wrote:

> Hi Vadim thanks I use HortonWorks package. I dont think there are any seg
> faults are dataframe I am trying to write is very small in size. Can it
> still create seg fault?
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: org.apache.spark.SparkException: Task failed while writing rows

2018-02-28 Thread Vadim Semenov
Who's your spark provider? EMR, Azure, Databricks, etc.? Maybe contact
them, since they've probably applied some patches

Also have you checked `stdout` for some Segfaults? I vaguely remember
getting `Task failed while writing rows at` and seeing some segfaults that
caused that

On Wed, Feb 28, 2018 at 2:07 PM, unk1102  wrote:

> Hi thanks Vadim you are right I saw that line already 468 I dont see any
> code
> it is just comment yes I am sure I am using all spark-* jar which is built
> for spark 2.2.0 and Scala 2.11. I am also stuck unfortunately with these
> errors not sure how to solve them.
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: org.apache.spark.SparkException: Task failed while writing rows

2018-02-28 Thread Vadim Semenov
I'm sorry, didn't see `Caused by:
java.lang.NullPointerException at
org.apache.spark.sql.SparkSession$$anonfun$3.apply(SparkSession.scala:468)`

Are you sure that you use 2.2.0?
I don't see any code on that line
https://github.com/apache/spark/blob/v2.2.0/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala#L468

Also pretty strange that it fails there

On Wed, Feb 28, 2018 at 1:55 PM, unk1102  wrote:

> Hi thanks for the reply I only see NPE and Task failed while writing rows
> all
> over places I dont see any other errors expect SparkException job aborted
> and followed by two exception I pasted earlier.
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: org.apache.spark.SparkException: Task failed while writing rows

2018-02-28 Thread Vadim Semenov
There should be another exception trace (basically, the actual cause) after
this one, could you post it?

On Wed, Feb 28, 2018 at 1:39 PM, unk1102  wrote:

> Hi I am getting the following exception when I try to write DataFrame using
> the following code. Please guide. I am using Spark 2.2.0.
>
> df.write.format("parquet").mode(SaveMode.Append);
>
> org.apache.spark.SparkException: Task failed while writing rows at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$
> spark$sql$execution$datasources$FileFormatWriter$$
> executeTask(FileFormatWriter.scala:270)
> at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$
> write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:189)
> at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$
> write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:188)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at
> org.apache.spark.scheduler.Task.run(Task.scala:108) at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) at
> java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745) Caused by:
> java.lang.NullPointerException at
> org.apache.spark.sql.SparkSession$$anonfun$3.apply(SparkSession.scala:468)
> at
> org.apache.spark.sql.SparkSession$$anonfun$3.apply(SparkSession.scala:468)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at
> scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$
> SingleDirectoryWriteTask.execute(FileFormatWriter.scala:324)
> at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$
> apache$spark$sql$execution$datasources$FileFormatWriter$$
> executeTask$3.apply(FileFormatWriter.scala:256)
> at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$
> apache$spark$sql$execution$datasources$FileFormatWriter$$
> executeTask$3.apply(FileFormatWriter.scala:254)
> at
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCa
> llbacks(Utils.scala:1371)
> at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$
> spark$sql$execution$datasources$FileFormatWriter$$
> executeTask(FileFormatWriter.scala:259)
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark EMR executor-core vs Vcores

2018-02-26 Thread Vadim Semenov
yeah, for some reason (unknown to me, but you can find on aws forums) they
double the actual number of cores for nodemanagers.

I assume that's done to maximize utilization, but doesn't really matter to
me, at least, since I only run Spark, so I, personally, set `total number
of cores - 1/2` saving one core for the OS/DataNode/NodeManager, because
Spark itself can create a significant load.

On Mon, Feb 26, 2018 at 4:51 PM, Selvam Raman <sel...@gmail.com> wrote:

> Thanks. That’s make sense.
>
> I want to know one more think , available vcore per machine is 16 but
> threads per node 8. Am I missing to relate here.
>
> What I m thinking now is number of vote = number of threads.
>
>
>
> On Mon, 26 Feb 2018 at 18:45, Vadim Semenov <va...@datadoghq.com> wrote:
>
>> All used cores aren't getting reported correctly in EMR, and YARN itself
>> has no control over it, so whatever you put in `spark.executor.cores` will
>> be used,
>> but in the ResourceManager you will only see 1 vcore used per nodemanager.
>>
>> On Mon, Feb 26, 2018 at 5:20 AM, Selvam Raman <sel...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> spark version - 2.0.0
>>> spark distribution - EMR 5.0.0
>>>
>>> Spark Cluster - one master, 5 slaves
>>>
>>> Master node - m3.xlarge - 8 vCore, 15 GiB memory, 80 SSD GB storage
>>> Slave node - m3.2xlarge - 16 vCore, 30 GiB memory, 160 SSD GB storage
>>>
>>>
>>> Cluster Metrics
>>> Apps SubmittedApps PendingApps RunningApps CompletedContainers RunningMemory
>>> UsedMemory TotalMemory ReservedVCores UsedVCores TotalVCores ReservedActive
>>> NodesDecommissioning NodesDecommissioned NodesLost NodesUnhealthy 
>>> NodesRebooted
>>> Nodes
>>> 16 0 1 15 5 88.88 GB 90.50 GB 22 GB 5 79 1 5
>>> <http://localhost:8088/cluster/nodes> 0
>>> <http://localhost:8088/cluster/nodes/decommissioning> 0
>>> <http://localhost:8088/cluster/nodes/decommissioned> 5
>>> <http://localhost:8088/cluster/nodes/lost> 0
>>> <http://localhost:8088/cluster/nodes/unhealthy> 0
>>> <http://localhost:8088/cluster/nodes/rebooted>
>>> I have submitted job with below configuration
>>> --num-executors 5 --executor-cores 10 --executor-memory 20g
>>>
>>>
>>>
>>> spark.task.cpus - be default 1
>>>
>>>
>>> My understanding is there will be 5 executore each can run 10 task at a
>>> time and task can share total memory of 20g. Here, i could see only 5
>>> vcores used which means 1 executor instance use 20g+10%overhead ram(22gb),
>>> 10 core(number of threads), 1 Vcore(cpu).
>>>
>>> please correct me if my understand is wrong.
>>>
>>> how can i utilize number of vcore in EMR effectively. Will Vcore boost
>>> performance?
>>>
>>>
>>> --
>>> Selvam Raman
>>> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>>>
>>
>> --
> Selvam Raman
> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>


Re: Spark EMR executor-core vs Vcores

2018-02-26 Thread Vadim Semenov
All used cores aren't getting reported correctly in EMR, and YARN itself
has no control over it, so whatever you put in `spark.executor.cores` will
be used,
but in the ResourceManager you will only see 1 vcore used per nodemanager.

On Mon, Feb 26, 2018 at 5:20 AM, Selvam Raman  wrote:

> Hi,
>
> spark version - 2.0.0
> spark distribution - EMR 5.0.0
>
> Spark Cluster - one master, 5 slaves
>
> Master node - m3.xlarge - 8 vCore, 15 GiB memory, 80 SSD GB storage
> Slave node - m3.2xlarge - 16 vCore, 30 GiB memory, 160 SSD GB storage
>
>
> Cluster Metrics
> Apps SubmittedApps PendingApps RunningApps CompletedContainers RunningMemory
> UsedMemory TotalMemory ReservedVCores UsedVCores TotalVCores ReservedActive
> NodesDecommissioning NodesDecommissioned NodesLost NodesUnhealthy 
> NodesRebooted
> Nodes
> 16 0 1 15 5 88.88 GB 90.50 GB 22 GB 5 79 1 5
>  0
>  0
>  5
>  0
>  0
> 
> I have submitted job with below configuration
> --num-executors 5 --executor-cores 10 --executor-memory 20g
>
>
>
> spark.task.cpus - be default 1
>
>
> My understanding is there will be 5 executore each can run 10 task at a
> time and task can share total memory of 20g. Here, i could see only 5
> vcores used which means 1 executor instance use 20g+10%overhead ram(22gb),
> 10 core(number of threads), 1 Vcore(cpu).
>
> please correct me if my understand is wrong.
>
> how can i utilize number of vcore in EMR effectively. Will Vcore boost
> performance?
>
>
> --
> Selvam Raman
> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>


Re: Sharing spark executor pool across multiple long running spark applications

2018-02-07 Thread Vadim Semenov
The other way might be to launch a single SparkContext and then run jobs
inside of it.

You can take a look at these projects:
-
https://github.com/spark-jobserver/spark-jobserver#persistent-context-mode---faster--required-for-related-jobs
- http://livy.incubator.apache.org

Problems with this way:
- Can't update the code of your job.
- A single job can break the SparkContext.


We evaluated this way and decided to go with the dynamic allocation,
but we also had to rethink the way we write our jobs:
- Can't use caching since it locks executors, have to use checkpointing,
which adds up to computation time.
- Use some unconventional methods like reusing the same DF to write out
multiple separate things in one go.
- Sometimes remove executors from within the job, like when we know how
many we would need, so the executors could join other jobs.

On Tue, Feb 6, 2018 at 3:00 PM, Nirav Patel  wrote:

> Currently sparkContext and it's executor pool is not shareable. Each
> spakContext gets its own executor pool for entire life of an application.
> So what is the best ways to share cluster resources across multiple long
> running spark applications?
>
> Only one I see is spark dynamic allocation but it has high latency when it
> comes to real-time application.
>
>
>
>
> [image: What's New with Xactly] 
>
> 
> 
>    
> 


Re: Passing an array of more than 22 elements in a UDF

2017-12-26 Thread Vadim Semenov
Functions are still limited to 22 arguments

https://github.com/scala/scala/blob/2.13.x/src/library/scala/Function22.scala

On Tue, Dec 26, 2017 at 2:19 PM, Felix Cheung 
wrote:

> Generally the 22 limitation is from Scala 2.10.
>
> In Scala 2.11, the issue with case class is fixed, but with that said I’m
> not sure if with UDF in Java other limitation might apply.
>
> _
> From: Aakash Basu 
> Sent: Monday, December 25, 2017 9:13 PM
> Subject: Re: Passing an array of more than 22 elements in a UDF
> To: Felix Cheung 
> Cc: ayan guha , user 
>
>
>
> What's the privilege of using that specific version for this? Please throw
> some light onto it.
>
> On Mon, Dec 25, 2017 at 6:51 AM, Felix Cheung 
> wrote:
>
>> Or use it with Scala 2.11?
>>
>> --
>> *From:* ayan guha 
>> *Sent:* Friday, December 22, 2017 3:15:14 AM
>> *To:* Aakash Basu
>> *Cc:* user
>> *Subject:* Re: Passing an array of more than 22 elements in a UDF
>>
>> Hi I think you are in correct track. You can stuff all your param in a
>> suitable data structure like array or dict and pass this structure as a
>> single param in your udf.
>>
>> On Fri, 22 Dec 2017 at 2:55 pm, Aakash Basu 
>> wrote:
>>
>>> Hi,
>>>
>>> I am using Spark 2.2 using Java, can anyone please suggest me how to
>>> take more than 22 parameters in an UDF? I mean, if I want to pass all the
>>> parameters as an array of integers?
>>>
>>> Thanks,
>>> Aakash.
>>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>
>
>


Re: /tmp fills up to 100GB when using a window function

2017-12-20 Thread Vadim Semenov
Ah, yes, I missed that part

it's `spark.local.dir`

spark.local.dir /tmp Directory to use for "scratch" space in Spark,
including map output files and RDDs that get stored on disk. This should be
on a fast, local disk in your system. It can also be a comma-separated list
of multiple directories on different disks. NOTE: In Spark 1.0 and later
this will be overridden by SPARK_LOCAL_DIRS (Standalone, Mesos) or
LOCAL_DIRS (YARN) environment variables set by the cluster manager.

On Wed, Dec 20, 2017 at 2:58 PM, Gourav Sengupta 
wrote:

> I do think that there is an option to set the temporary shuffle location
> to a particular directory. While working with EMR I set it to /mnt1/. Let
> me know in case you are not able to find it.
>
> On Mon, Dec 18, 2017 at 8:10 PM, Mihai Iacob  wrote:
>
>> This code generates files under /tmp...blockmgr... which do not get
>> cleaned up after the job finishes.
>>
>> Anything wrong with the code below? or are there any known issues with
>> spark not cleaning up /tmp files?
>>
>>
>> window = Window.\
>>   partitionBy('***', 'date_str').\
>>   orderBy(sqlDf['***'])
>>
>> sqlDf = sqlDf.withColumn("***",rank().over(window))
>> df_w_least = sqlDf.filter("***=1")
>>
>>
>>
>>
>>
>> Regards,
>>
>> *Mihai Iacob*
>> DSX Local  - Security, IBM Analytics
>>
>> - To
>> unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>
>


Re: /tmp fills up to 100GB when using a window function

2017-12-19 Thread Vadim Semenov
Until after an action is done (i.e. save/count/reduce) or if you explicitly
truncate the DAG by checkpointing.

Spark needs to keep all shuffle files because if some task/stage/node fails
it'll only need to recompute missing partitions by using already computed
parts.

On Tue, Dec 19, 2017 at 10:08 AM, Mihai Iacob <mia...@ca.ibm.com> wrote:

> When does spark remove them?
>
>
> Regards,
>
> *Mihai Iacob*
> DSX Local <https://datascience.ibm.com/local> - Security, IBM Analytics
>
>
>
> - Original message -
> From: Vadim Semenov <vadim.seme...@datadoghq.com>
> To: Mihai Iacob <mia...@ca.ibm.com>
> Cc: user <user@spark.apache.org>
> Subject: Re: /tmp fills up to 100GB when using a window function
> Date: Tue, Dec 19, 2017 9:46 AM
>
> Spark doesn't remove intermediate shuffle files if they're part of the
> same job.
>
> On Mon, Dec 18, 2017 at 3:10 PM, Mihai Iacob <mia...@ca.ibm.com> wrote:
>
> This code generates files under /tmp...blockmgr... which do not get
> cleaned up after the job finishes.
>
> Anything wrong with the code below? or are there any known issues with
> spark not cleaning up /tmp files?
>
> window = Window.\
>   partitionBy('***', 'date_str').\
>   orderBy(sqlDf['***'])
>
> sqlDf = sqlDf.withColumn("***",rank().over(window))
> df_w_least = sqlDf.filter("***=1")
>
>
>
>
> Regards,
>
> *Mihai Iacob*
> DSX Local <https://datascience.ibm.com/local> - Security, IBM Analytics
>
> - To
> unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>
>
>


Re: /tmp fills up to 100GB when using a window function

2017-12-19 Thread Vadim Semenov
Spark doesn't remove intermediate shuffle files if they're part of the same
job.

On Mon, Dec 18, 2017 at 3:10 PM, Mihai Iacob  wrote:

> This code generates files under /tmp...blockmgr... which do not get
> cleaned up after the job finishes.
>
> Anything wrong with the code below? or are there any known issues with
> spark not cleaning up /tmp files?
>
>
> window = Window.\
>   partitionBy('***', 'date_str').\
>   orderBy(sqlDf['***'])
>
> sqlDf = sqlDf.withColumn("***",rank().over(window))
> df_w_least = sqlDf.filter("***=1")
>
>
>
>
>
> Regards,
>
> *Mihai Iacob*
> DSX Local  - Security, IBM Analytics
>
> - To
> unsubscribe e-mail: user-unsubscr...@spark.apache.org


Re: What does Blockchain technology mean for Big Data? And how Hadoop/Spark will play role with it?

2017-12-19 Thread Vadim Semenov
I think it means that we can replace HDFS with a blockchain-based FS, and
then offload some processing to smart contracts.

On Mon, Dec 18, 2017 at 11:59 PM, KhajaAsmath Mohammed <
mdkhajaasm...@gmail.com> wrote:

> I am looking for same answer too .. will wait for response from other
> people
>
> Sent from my iPhone
>
> > On Dec 18, 2017, at 10:56 PM, Gaurav1809 
> wrote:
> >
> > Hi All,
> >
> > Will Bigdata tools & technology work with Blockchain in future? Any
> possible
> > use cases that anyone is likely to face, please share.
> >
> > Thanks
> > Gaurav
> >
> >
> >
> > --
> > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: NullPointerException while reading a column from the row

2017-12-19 Thread Vadim Semenov
getAs defined as:

def getAs[T](i: Int): T = get(i).asInstanceOf[T]

and when you do toString you call Object.toString which doesn't depend on
the type,
so asInstanceOf[T] get dropped by the compiler, i.e.

row.getAs[Int](0).toString -> row.get(0).toString

we can confirm that by writing a simple scala code:

import org.apache.spark.sql._
object Test {
  val row = Row(null)
  row.getAs[Int](0).toString
}

and then compiling it:

$ scalac -classpath $SPARK_HOME/jars/'*' -print test.scala
[[syntax trees at end of   cleanup]] // test.scala
package  {
  object Test extends Object {
private[this] val row: org.apache.spark.sql.Row = _;
  def row(): org.apache.spark.sql.Row = Test.this.row;
def (): Test.type = {
  Test.super.();
  Test.this.row =
org.apache.spark.sql.Row.apply(scala.this.Predef.genericWrapArray(Array[Object]{null}));
  *Test.this.row().getAs(0).toString();*
  ()
}
  }
}

So the proper way would be:

String.valueOf(row.getAs[Int](0))


On Tue, Dec 19, 2017 at 4:23 AM, Anurag Sharma  wrote:

> The following Scala (Spark 1.6) code for reading a value from a Row fails
> with a NullPointerException when the value is null.
>
> val test = row.getAs[Int]("ColumnName").toString
>
> while this works fine
>
> val test1 = row.getAs[Int]("ColumnName") // returns 0 for nullval test2 = 
> test1.toString // converts to String fine
>
> What is causing NullPointerException and what is the recommended way to
> handle such cases?
>
> PS: getting row from DataFrame as follows:
>
>  val myRDD = myDF
> .repartition(partitions)
> .mapPartitions {
>   rows =>
> rows.flatMap {
> row =>
>   functionWithRows(row) //has above logic to read null column 
> which fails
>   }
>   }
>
> functionWithRows has then above mentioned NullPointerException
>
> MyDF schema:
>
> root
>  |-- LDID: string (nullable = true)
>  |-- KTAG: string (nullable = true)
>  |-- ColumnName: integer (nullable = true)
>
>


Re: RDD[internalRow] -> DataSet

2017-12-12 Thread Vadim Semenov
not possible, but you can add your own object in your project to the
spark's package that would give you access to private methods

package org.apache.spark.sql

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.LogicalRDD
import org.apache.spark.sql.types.StructType

object DataFrameUtil {
  /**
* Creates a DataFrame out of RDD[InternalRow] that you can get
using `df.queryExection.toRdd`
*/
  def createFromInternalRows(sparkSession: SparkSession, schema:
StructType, rdd: RDD[InternalRow]): DataFrame = {
val logicalPlan = LogicalRDD(schema.toAttributes, rdd)(sparkSession)
Dataset.ofRows(sparkSession, logicalPlan)
  }
}


Re: JDK1.8 for spark workers

2017-11-29 Thread Vadim Semenov
You can pass `JAVA_HOME` environment variable

`spark.executorEnv.JAVA_HOME=/usr/lib/jvm/java-1.8.0`

On Wed, Nov 29, 2017 at 10:54 AM, KhajaAsmath Mohammed <
mdkhajaasm...@gmail.com> wrote:

> Hi,
>
> I am running cloudera version of spark2.1 and our cluster is on JDK1.7.
> For some of the libraries, I need JDK1.8, is there a way to set to run
> Spark worker in JDK1.8 without upgrading .
>
> I was able run driver in JDK 1.8 by setting the path but not the workers.
>
> 17/11/28 20:22:27 WARN scheduler.TaskSetManager: Lost task 0.0 in stage
> 1.0 (TID 1, brksvl267.brk.navistar.com, executor 1): 
> java.lang.UnsupportedClassVersionError:
> org/wololo/geojson/GeoJSON : Unsupported major.minor version 52.0
>
> Thanks,
> Asmath
>


Re: Spark Writing to parquet directory : java.io.IOException: Disk quota exceeded

2017-11-22 Thread Vadim Semenov
The error message seems self-explanatory, try to figure out what's the disk
quota you have for your user.

On Wed, Nov 22, 2017 at 8:23 AM, Chetan Khatri 
wrote:

> Anybody reply on this ?
>
> On Tue, Nov 21, 2017 at 3:36 PM, Chetan Khatri <
> chetan.opensou...@gmail.com> wrote:
>
>>
>> Hello Spark Users,
>>
>> I am getting below error, when i am trying to write dataset to parquet
>> location. I have enough disk space available. Last time i was facing same
>> kind of error which were resolved by increasing number of cores at hyper
>> parameters. Currently result set data size is almost 400Gig with below
>> hyper parameters
>>
>> Driver memory: 4g
>> Executor Memory: 16g
>> Executor cores=12
>> num executors= 8
>>
>> Still it's failing, any Idea ? that if i increase executor memory and
>> number of executors.  it could get resolved ?
>>
>>
>> 17/11/21 04:29:37 ERROR storage.DiskBlockObjectWriter: Uncaught exception
>> while reverting partial writes to file /mapr/chetan/local/david.com/t
>> mp/hadoop/nm-local-dir/usercache/david-khurana/appcache/
>> application_1509639363072_10572/blockmgr-008604e6-37cb-
>> 421f-8cc5-e94db75684e7/12/temp_shuffle_ae885911-a1ef-
>> 404f-9a6a-ded544bb5b3c
>> java.io.IOException: Disk quota exceeded
>> at java.io.FileOutputStream.close0(Native Method)
>> at java.io.FileOutputStream.access$000(FileOutputStream.java:53)
>> at java.io.FileOutputStream$1.close(FileOutputStream.java:356)
>> at java.io.FileDescriptor.closeAll(FileDescriptor.java:212)
>> at java.io.FileOutputStream.close(FileOutputStream.java:354)
>> at org.apache.spark.storage.TimeTrackingOutputStream.close(Time
>> TrackingOutputStream.java:72)
>> at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
>> at net.jpountz.lz4.LZ4BlockOutputStream.close(LZ4BlockOutputStr
>> eam.java:178)
>> at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
>> at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
>> at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$
>> anon$2.close(UnsafeRowSerializer.scala:96)
>> at org.apache.spark.storage.DiskBlockObjectWriter$$anonfun$
>> close$2.apply$mcV$sp(DiskBlockObjectWriter.scala:108)
>> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:
>> 1316)
>> at org.apache.spark.storage.DiskBlockObjectWriter.close(DiskBlo
>> ckObjectWriter.scala:107)
>> at org.apache.spark.storage.DiskBlockObjectWriter.revertPartial
>> WritesAndClose(DiskBlockObjectWriter.scala:159)
>> at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.s
>> top(BypassMergeSortShuffleWriter.java:234)
>> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMap
>> Task.scala:85)
>> at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMap
>> Task.scala:47)
>> at org.apache.spark.scheduler.Task.run(Task.scala:86)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.
>> scala:274)
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>> Executor.java:1142)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>> lExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>> 17/11/21 04:29:37 WARN netty.OneWayOutboxMessage: Failed to send one-way
>> RPC.
>> java.io.IOException: Failed to connect to /192.168.123.43:58889
>> at org.apache.spark.network.client.TransportClientFactory.creat
>> eClient(TransportClientFactory.java:228)
>> at org.apache.spark.network.client.TransportClientFactory.creat
>> eClient(TransportClientFactory.java:179)
>> at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpc
>> Env.scala:197)
>> at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:
>> 191)
>> at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:
>> 187)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>> Executor.java:1142)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>> lExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.net.ConnectException: Connection refused: /
>> 192.168.123.43:58889
>> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>> at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl
>> .java:717)
>> at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect
>> (NioSocketChannel.java:224)
>> at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fi
>> nishConnect(AbstractNioChannel.java:289)
>> at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEven
>> tLoop.java:528)
>> at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimiz
>> ed(NioEventLoop.java:468)
>> at 

Re: Kryo not registered class

2017-11-20 Thread Vadim Semenov
Try:

Class.forName("[Lorg.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex$SerializableFileStatus$SerializableBlockLocation;")

On Sun, Nov 19, 2017 at 3:24 PM, Angel Francisco Orta <
angel.francisco.o...@gmail.com> wrote:

> Hello, I'm with spark 2.1.0 with scala and I'm registering all classes
> with kryo, and I have a  problem registering this class,
>
> org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex$
> SerializableFileStatus$SerializableBlockLocation[]
>
> I can't register with classOf[Array[Class.forName("org.apache.spark.sql.
> execution.datasources.PartitioningAwareFileIndex$SerializableFileStatus$
> SerializableBlockLocation").type]]
>
>
> I have tried as well creating a java class like register and registering
> the class as org.apache.spark.sql.execution.datasources.
> PartitioningAwareFileIndex$SerializableFileStatus$
> SerializableBlockLocation[].class;
>
> Any clue is appreciatted,
>
> Thanks.
>
>


Re: Process large JSON file without causing OOM

2017-11-15 Thread Vadim Semenov
There's a lot of off-heap memory involved in decompressing Snappy,
compressing ZLib.

Since you're running using `local[*]`, you process multiple tasks
simultaneously, so they all might consume memory.

I don't think that increasing heap will help, since it looks like you're
hitting system memory limits.

I'd suggest trying to run with `local[2]` and checking what's the memory
usage of the jvm process.

On Mon, Nov 13, 2017 at 7:22 PM, Alec Swan  wrote:

> Hello,
>
> I am using the Spark library to convert JSON/Snappy files to ORC/ZLIB
> format. Effectively, my Java service starts up an embedded Spark cluster
> (master=local[*]) and uses Spark SQL to convert JSON to ORC. However, I
> keep getting OOM errors with large (~1GB) files.
>
> I've tried different ways to reduce memory usage, e.g. by partitioning
> data with dataSet.partitionBy("customer).save(filePath), or capping
> memory usage by setting spark.executor.memory=1G, but to no vail.
>
> I am wondering if there is a way to avoid OOM besides splitting the source
> JSON file into multiple smaller ones and processing the small ones
> individually? Does Spark SQL have to read the JSON/Snappy (row-based) file
> in it's entirety before converting it to ORC (columnar)? If so, would it
> make sense to create a custom receiver that reads the Snappy file and use
> Spark streaming for ORC conversion?
>
> Thanks,
>
> Alec
>


Re: Spark based Data Warehouse

2017-11-12 Thread Vadim Semenov
It's actually quite simple to answer

> 1. Is Spark SQL and UDF, able to handle all the workloads?
Yes

> 2. What user interface did you provide for data scientist, data engineers
and analysts
Home-grown platform, EMR, Zeppelin

> What are the challenges in running concurrent queries, by many users,
over Spark SQL? Considering Spark still does not provide spill to disk, in
many scenarios, are there frequent query failures when executing concurrent
queries
You can run separate Spark Contexts, so jobs will be isolated

> Are there any open source implementations, which provide something
similar?
Yes, many.


On Sun, Nov 12, 2017 at 1:47 PM, Gourav Sengupta 
wrote:

> Dear Ashish,
> what you are asking for involves at least a few weeks of dedicated
> understanding of your used case and then it takes at least 3 to 4 months to
> even propose a solution. You can even build a fantastic data warehouse just
> using C++. The matter depends on lots of conditions. I just think that your
> approach and question needs a lot of modification.
>
> Regards,
> Gourav
>
> On Sun, Nov 12, 2017 at 6:19 PM, Phillip Henry 
> wrote:
>
>> Hi, Ashish.
>>
>> You are correct in saying that not *all* functionality of Spark is
>> spill-to-disk but I am not sure how this pertains to a "concurrent user
>> scenario". Each executor will run in its own JVM and is therefore isolated
>> from others. That is, if the JVM of one user dies, this should not effect
>> another user who is running their own jobs in their own JVMs. The amount of
>> resources used by a user can be controlled by the resource manager.
>>
>> AFAIK, you configure something like YARN to limit the number of cores and
>> the amount of memory in the cluster a certain user or group is allowed to
>> use for their job. This is obviously quite a coarse-grained approach as (to
>> my knowledge) IO is not throttled. I believe people generally use something
>> like Apache Ambari to keep an eye on network and disk usage to mitigate
>> problems in a shared cluster.
>>
>> If the user has badly designed their query, it may very well fail with
>> OOMEs but this can happen irrespective of whether one user or many is using
>> the cluster at a given moment in time.
>>
>> Does this help?
>>
>> Regards,
>>
>> Phillip
>>
>>
>> On Sun, Nov 12, 2017 at 5:50 PM, ashish rawat 
>> wrote:
>>
>>> Thanks Jorn and Phillip. My question was specifically to anyone who have
>>> tried creating a system using spark SQL, as Data Warehouse. I was trying to
>>> check, if someone has tried it and they can help with the kind of workloads
>>> which worked and the ones, which have problems.
>>>
>>> Regarding spill to disk, I might be wrong but not all functionality of
>>> spark is spill to disk. So it still doesn't provide DB like reliability in
>>> execution. In case of DBs, queries get slow but they don't fail or go out
>>> of memory, specifically in concurrent user scenarios.
>>>
>>> Regards,
>>> Ashish
>>>
>>> On Nov 12, 2017 3:02 PM, "Phillip Henry" 
>>> wrote:
>>>
>>> Agree with Jorn. The answer is: it depends.
>>>
>>> In the past, I've worked with data scientists who are happy to use the
>>> Spark CLI. Again, the answer is "it depends" (in this case, on the skills
>>> of your customers).
>>>
>>> Regarding sharing resources, different teams were limited to their own
>>> queue so they could not hog all the resources. However, people within a
>>> team had to do some horse trading if they had a particularly intensive job
>>> to run. I did feel that this was an area that could be improved. It may be
>>> by now, I've just not looked into it for a while.
>>>
>>> BTW I'm not sure what you mean by "Spark still does not provide spill to
>>> disk" as the FAQ says "Spark's operators spill data to disk if it does not
>>> fit in memory" (http://spark.apache.org/faq.html). So, your data will
>>> not normally cause OutOfMemoryErrors (certain terms and conditions may
>>> apply).
>>>
>>> My 2 cents.
>>>
>>> Phillip
>>>
>>>
>>>
>>> On Sun, Nov 12, 2017 at 9:14 AM, Jörn Franke 
>>> wrote:
>>>
 What do you mean all possible workloads?
 You cannot prepare any system to do all possible processing.

 We do not know the requirements of your data scientists now or in the
 future so it is difficult to say. How do they work currently without the
 new solution? Do they all work on the same data? I bet you will receive on
 your email a lot of private messages trying to sell their solution that
 solves everything - with the information you provided this is impossible to
 say.

 Then with every system: have incremental releases but have then in
 short time frames - do not engineer a big system that you will deliver in 2
 years. In the cloud you have the perfect possibility to scale feature but
 also infrastructure wise.

 Challenges with concurrent queries is 

Re: Is there a difference between df.cache() vs df.rdd.cache()

2017-10-13 Thread Vadim Semenov
When you do `Dataset.rdd` you actually create a new job

here you can see what it does internally:
https://github.com/apache/spark/blob/master/sql/core/
src/main/scala/org/apache/spark/sql/Dataset.scala#L2816-L2828



On Fri, Oct 13, 2017 at 5:24 PM, Supun Nakandala 
wrote:

> Hi Weichen,
>
> Thank you for the reply.
>
> My understanding was Dataframe API is using the old RDD implementation
> under the covers though it presents a different API. And calling
> df.rdd will simply give access to the underlying RDD. Is this assumption
> wrong? I would appreciate if you can shed more insights on this issue or
> point me to documentation where I can learn them.
>
> Thank you in advance.
>
> On Fri, Oct 13, 2017 at 3:19 AM, Weichen Xu 
> wrote:
>
>> You should use `df.cache()`
>> `df.rdd.cache()` won't work, because `df.rdd` generate a new RDD from the
>> original `df`. and then cache the new RDD.
>>
>> On Fri, Oct 13, 2017 at 3:35 PM, Supun Nakandala <
>> supun.nakand...@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>> I have been experimenting with cache/persist/unpersist methods with
>>> respect to both Dataframes and RDD APIs. However, I am experiencing
>>> different behaviors Ddataframe API compared RDD API such Dataframes are not
>>> getting cached when count() is called.
>>>
>>> Is there a difference between how these operations act wrt to Dataframe
>>> and RDD APIs?
>>>
>>> Thank You.
>>> -Supun
>>>
>>
>>
>


Re: EMR: Use extra mounted EBS volumes for spark.local.dir

2017-10-10 Thread Vadim Semenov
that's probably better be directed to the AWS support

On Sun, Oct 8, 2017 at 9:54 PM, Tushar Sudake  wrote:

> Hello everyone,
>
> I'm using 'r4.8xlarge' instances on EMR for my Spark Application.
> To each node, I'm attaching one 512 GB EBS volume.
>
> By logging in into nodes I tried verifying that this volume is being set
> for 'spark.local.dir' by EMR automatically, but couldn't find any such
> configuration.
>
> Can someone please confirm this? Do I need to do it myself though
> bootstrap steps?
>
> Thanks,
> Tushar
>


Re: Unable to run Spark Jobs in yarn cluster mode

2017-10-10 Thread Vadim Semenov
Try increasing the `spark.yarn.am.waitTime` parameter, it's by default set
to 100ms which might not be enough in certain cases.

On Tue, Oct 10, 2017 at 7:02 AM, Debabrata Ghosh 
wrote:

> Hi All,
> I am constantly hitting an error : "ApplicationMaster:
> SparkContext did not initialize after waiting for 100 ms" while running my
> Spark code in yarn cluster mode.
>
> Here is the command what I am using :* spark-submit --master yarn
> --deploy-mode cluster spark_code.py*
>
>  Please can you help me with a resolution at your
> convenience.
>
> Thanks in advance !
>
> Cheers,
>
> Debu
>
>
>


Re: how do you deal with datetime in Spark?

2017-10-03 Thread Vadim Semenov
I usually check the list of Hive UDFs as Spark has implemented almost all
of them
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-DateFunctions

Or/and check `org.apache.spark.sql.functions` directly:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
https://spark.apache.org/docs/2.2.0/api/java/org/apache/spark/sql/functions.html

Also you can check the list of all Datetime functions here
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala#L368-L399

and what they do here
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala



On Tue, Oct 3, 2017 at 1:43 PM, Adaryl Wakefield <
adaryl.wakefi...@hotmail.com> wrote:

> I gave myself a project to start actually writing Spark programs. I’m
> using Scala and Spark 2.2.0. In my project, I had to do some grouping and
> filtering by dates. It was awful and took forever. I was trying to use
> dataframes and SQL as much as possible. I see that there are date functions
> in the dataframe API but trying to use them was frustrating. Even following
> code samples was a headache because apparently the code is different
> depending on which version of Spark you are using. I was really hoping for
> a rich set of date functions like you’d find in T-SQL but I never really
> found them.
>
>
>
> Is there a best practice for dealing with dates and time in Spark? I feel
> like taking a date/time string and converting it to a date/time object and
> then manipulating data based on the various components of the timestamp
> object (hour, day, year etc.) should be a heck of a lot easier than what
> I’m finding and perhaps I’m just not looking in the right place.
>
>
>
> You can see my work here: https://github.com/BobLovesData/Apache-Spark-In-
> 24-Hours/blob/master/src/net/massstreet/hour10/BayAreaBikeAnalysis.scala
>
>
>
> Adaryl "Bob" Wakefield, MBA
> Principal
> Mass Street Analytics, LLC
> 913.938.6685 <(913)%20938-6685>
>
> www.massstreet.net
>
> www.linkedin.com/in/bobwakefieldmba
> Twitter: @BobLovesData 
>
>
>
>
>


Re: Multiple filters vs multiple conditions

2017-10-03 Thread Vadim Semenov
Since you're using Dataset API or RDD API, they won't be fused together by
the Catalyst optimizer unless you use the DF API.
Two filters will get executed within one stage, and there'll be very small
overhead on having two separate filters vs having only one.

On Tue, Oct 3, 2017 at 8:14 AM, Ahmed Mahmoud  wrote:

> Hi All,
>
> Just a quick question from an optimisation point of view:
>
> Approach 1:
> .filter (t-> t.x=1 && t.y=2)
>
> Approach 2:
> .filter (t-> t.x=1)
> .filter (t-> t.y=2)
>
> Is there a difference or one is better than the other  or both are same?
>
> Thanks!
> Ahmed Mahmoud
>
>


Re: More instances = slower Spark job

2017-09-29 Thread Vadim Semenov
Hi Jeroen,

> However, am I correct in assuming that all the filtering will be then
performed on the driver (since the .gz files are not splittable), albeit in
several threads?

Filtering will not happen on the driver, it'll happen on executors, since
`spark.read.json(…).filter(…).write(…)` is a separate job. But you have to
submit each job in a separate thread, because each thread will get locked
until the corresponding job finishes, so that's why you have to use
`parallel collections`, you could also just use Futures, but it's just
easier to use a `ParArray`.

Internally it will work this way: once one task finishes decompressing a
file, many tasks will get scheduled (based on `spark.default.parallelism`),
and the executor that decompressed the file will start processing lines
using all available threads, and after some time additional executors may
join (based on the locality levels), and then after filtering, you would
have to repartition back to 1 partition, so you could write just one
`.gzip` file.

And for each file, there will be a separate job, but because they all run
within one Spark Context, executors will stay with the job, and will work
on all files simultaneously.
See more about scheduling within one application:
https://spark.apache.org/docs/2.2.0/job-scheduling.html#
scheduling-within-an-application

On Fri, Sep 29, 2017 at 12:58 PM, Jeroen Miller <bluedasya...@gmail.com>
wrote:

> On Thu, Sep 28, 2017 at 11:55 PM, Jeroen Miller <bluedasya...@gmail.com>
> wrote:
> > On Thu, Sep 28, 2017 at 9:16 PM, Vadim Semenov
> > <vadim.seme...@datadoghq.com> wrote:
> >> Instead of having one job, you can try processing each file in a
> separate
> >> job, but run multiple jobs in parallel within one SparkContext.
>
> Hello Vadim,
>
> Today was a bit busy and I did not have the time to play with your
> idea. However, am I correct in assuming that all the filtering will be
> then performed on the driver (since the .gz files are not splittable),
> albeit in several threads?
>
> If this is correct, then I guess the proper way to tackle this task
> would be to run without any executors, but using all the cores and
> memory of the machine for the driver?
>
> I will keep you posted on my progress,
>
> Thanks,
>
> Jeroen
>


Re: Saving dataframes with partitionBy: append partitions, overwrite within each

2017-09-29 Thread Vadim Semenov
As alternative: checkpoint the dataframe, collect days, and then delete
corresponding directories using hadoop FileUtils, then write the dataframe

On Fri, Sep 29, 2017 at 10:31 AM, peay  wrote:

> Hello,
>
> I am trying to use data_frame.write.partitionBy("day").save("dataset.parquet")
> to write a dataset while splitting by day.
>
> I would like to run a Spark job  to process, e.g., a month:
> dataset.parquet/day=2017-01-01/...
> ...
>
> and then run another Spark job to add another month using the same folder
> structure, getting me
> dataset.parquet/day=2017-01-01/
> ...
> dataset.parquet/day=2017-02-01/ 
> ...
>
> However:
> - with save mode "overwrite", when I process the second month, all of
> dataset.parquet/ gets removed and I lose whatever was already computed for
> the previous month.
> - with save mode "append", then I can't get idempotence: if I run the job
> to process a given month twice, I'll get duplicate data in all the
> subfolders for that month.
>
> Is there a way to do "append in terms of the subfolders from partitionBy,
> but overwrite within each such partitions? Any help would be appreciated.
>
> Thanks!
>


Re: HDFS or NFS as a cache?

2017-09-29 Thread Vadim Semenov
How many files you produce? I believe it spends a lot of time on renaming
the files because of the output committer.
Also instead of 5x c3.2xlarge try using 2x c3.8xlarge instead because they
have 10GbE and you can get good throughput for S3.

On Fri, Sep 29, 2017 at 9:15 AM, Alexander Czech <
alexander.cz...@googlemail.com> wrote:

> I have a small EC2 cluster with 5 c3.2xlarge nodes and I want to write
> parquet files to S3. But the S3 performance for various reasons is bad when
> I access s3 through the parquet write method:
>
> df.write.parquet('s3a://bucket/parquet')
>
> Now I want to setup a small cache for the parquet output. One output is
> about 12-15 GB in size. Would it be enough to setup a NFS-directory on the
> master, write the output to it and then move it to S3? Or should I setup a
> HDFS on the Master? Or should I even opt for an additional cluster running
> a HDFS solution on more than one node?
>
> thanks!
>


Re: Loading objects only once

2017-09-28 Thread Vadim Semenov
Something like this:

```
object Model {
   @transient lazy val modelObject = new ModelLoader("model-filename")

   def get() = modelObject
}

object SparkJob {
  def main(args: Array[String]) = {
 sc.addFile("s3://bucket/path/model-filename")

 sc.parallelize(…).map(test => {
 Model.get().use(…)
 })
  }
}
```

On Thu, Sep 28, 2017 at 3:49 PM, Vadim Semenov <vadim.seme...@datadoghq.com>
wrote:

> as an alternative
> ```
> spark-submit --files 
> ```
>
> the files will be put on each executor in the working directory, so you
> can then load it alongside your `map` function
>
> Behind the scene it uses `SparkContext.addFile` method that you can use
> too
> https://github.com/apache/spark/blob/master/core/src/
> main/scala/org/apache/spark/SparkContext.scala?utf8=✓#L1508-L1558
>
> On Wed, Sep 27, 2017 at 10:08 PM, Naveen Swamy <mnnav...@gmail.com> wrote:
>
>> Hello all,
>>
>> I am a new user to Spark, please bear with me if this has been discussed
>> earlier.
>>
>> I am trying to run batch inference using DL frameworks pre-trained models
>> and Spark. Basically, I want to download a model(which is usually ~500 MB)
>> onto the workers and load the model and run inference on images fetched
>> from the source like S3 something like this
>> rdd = sc.parallelize(load_from_s3)
>> rdd.map(fetch_from_s3).map(read_file).map(predict)
>>
>> I was able to get it running in local mode on Jupyter, However, I would
>> like to load the model only once and not every map operation. A setup hook
>> would have nice which loads the model once into the JVM, I came across this
>> JIRA https://issues.apache.org/jira/browse/SPARK-650  which suggests
>> that I can use Singleton and static initialization. I tried to do this
>> using a Singleton metaclass following the thread here
>> https://stackoverflow.com/questions/6760685/creating-a-singl
>> eton-in-python. Following this failed miserably complaining that Spark
>> cannot serialize ctype objects with pointer references.
>>
>> After a lot of trial and error, I moved the code to a separate file by
>> creating a static method for predict that checks if a class variable is set
>> or not and loads the model if not set. This approach does not sound thread
>> safe to me, So I wanted to reach out and see if there are established
>> patterns on how to achieve something like this.
>>
>>
>> Also, I would like to understand the executor->tasks->python process
>> mapping, Does each task gets mapped to a separate python process?  The
>> reason I ask is I want to be to use mapPartition method to load a batch of
>> files and run inference on them separately for which I need to load the
>> object once per task. Any
>>
>>
>> Thanks for your time in answering my question.
>>
>> Cheers, Naveen
>>
>>
>>
>


Re: Loading objects only once

2017-09-28 Thread Vadim Semenov
as an alternative
```
spark-submit --files 
```

the files will be put on each executor in the working directory, so you can
then load it alongside your `map` function

Behind the scene it uses `SparkContext.addFile` method that you can use too
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala?utf8=✓#L1508-L1558

On Wed, Sep 27, 2017 at 10:08 PM, Naveen Swamy  wrote:

> Hello all,
>
> I am a new user to Spark, please bear with me if this has been discussed
> earlier.
>
> I am trying to run batch inference using DL frameworks pre-trained models
> and Spark. Basically, I want to download a model(which is usually ~500 MB)
> onto the workers and load the model and run inference on images fetched
> from the source like S3 something like this
> rdd = sc.parallelize(load_from_s3)
> rdd.map(fetch_from_s3).map(read_file).map(predict)
>
> I was able to get it running in local mode on Jupyter, However, I would
> like to load the model only once and not every map operation. A setup hook
> would have nice which loads the model once into the JVM, I came across this
> JIRA https://issues.apache.org/jira/browse/SPARK-650  which suggests that
> I can use Singleton and static initialization. I tried to do this using
> a Singleton metaclass following the thread here
> https://stackoverflow.com/questions/6760685/creating-a-singleton-in-python.
> Following this failed miserably complaining that Spark cannot serialize
> ctype objects with pointer references.
>
> After a lot of trial and error, I moved the code to a separate file by
> creating a static method for predict that checks if a class variable is set
> or not and loads the model if not set. This approach does not sound thread
> safe to me, So I wanted to reach out and see if there are established
> patterns on how to achieve something like this.
>
>
> Also, I would like to understand the executor->tasks->python process
> mapping, Does each task gets mapped to a separate python process?  The
> reason I ask is I want to be to use mapPartition method to load a batch of
> files and run inference on them separately for which I need to load the
> object once per task. Any
>
>
> Thanks for your time in answering my question.
>
> Cheers, Naveen
>
>
>


Re: Massive fetch fails, io errors in TransportRequestHandler

2017-09-28 Thread Vadim Semenov
Looks like there's slowness in sending shuffle files, maybe one executor
get overwhelmed with all the other executors trying to pull data?
Try lifting `spark.network.timeout` further, we ourselves had to push it to
600s from the default 120s

On Thu, Sep 28, 2017 at 10:19 AM, Ilya Karpov 
wrote:

> Hi,
> I see strange behaviour in my job, and can’t understand what is wrong:
> the stage that uses shuffle data as an input job fails number of times
> because of org.apache.spark.shuffle.FetchFailedException seen in spark UI:
> FetchFailed(BlockManagerId(8, hostname, 11431, None), shuffleId=1,
> mapId=50192, reduceId=12698, message=
> FetchFailed(BlockManagerId(8, hostname, 11431, None), shuffleId=1,
> mapId=3, reduceId=12699, message=
>
> Digging in logs I found a scenario of task failure:
> 1. some shuffle-server-X-Y (important note: external shuffle service is
> OFF) report 'Broken pipe’ at 2017-09-26T05:40:26.484Z
> java.io.IOException: Broken pipe
> at sun.nio.ch.FileChannelImpl.transferTo0(Native Method)
> at sun.nio.ch.FileChannelImpl.transferToDirectlyInternal(
> FileChannelImpl.java:428)
> at sun.nio.ch.FileChannelImpl.transferToDirectly(
> FileChannelImpl.java:493)
> at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:608)
> at io.netty.channel.DefaultFileRegion.transferTo(
> DefaultFileRegion.java:139)
> at org.apache.spark.network.protocol.MessageWithHeader.
> transferTo(MessageWithHeader.java:121)
> at io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(
> NioSocketChannel.java:287)
> at io.netty.channel.nio.AbstractNioByteChannel.doWrite(
> AbstractNioByteChannel.java:237)
> at io.netty.channel.socket.nio.NioSocketChannel.doWrite(
> NioSocketChannel.java:314)
> at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(
> AbstractChannel.java:802)
> at io.netty.channel.nio.AbstractNioChannel$
> AbstractNioUnsafe.flush0(AbstractNioChannel.java:313)
> at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(
> AbstractChannel.java:770)
>
> and "chunk send" errors:
> Error sending result 
> ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=65546478185,
> chunkIndex=0}, buffer=FileSegmentManagedBuffer{file=
> /data/1/yarn/nm/usercache/hdfs/appcache/application_
> 1505206571245_2989/blockmgr-9be47304-ffe2-443a-bb10-
> 33a89928f5b9/04/shuffle_1_3_0.data, offset=40858881, length=3208}} to
> /someClientIP:somePort; closing connection
> with exceptions:
> java.nio.channels.ClosedChannelException
> at io.netty.channel.AbstractChannel$AbstractUnsafe.write(...)(Unknown
> Source)
>
> 2. then client of this shuffle-server complains with:
> Connection to some-hostname/someIP:port has been quiet for 24 ms while
> there are outstanding requests. Assuming connection is dead; please adjust
> spark.network.timeout if this is wrong.
> and then
> Still have 3386 requests outstanding when connection from
> some-hostname/someIP:11431 is closed
> and then
> java.io.IOException: Connection from 
> shuffleServerHostname/shuffleServerIP:port
> closed
> at org.apache.spark.network.client.TransportResponseHandler.
> channelInactive(TransportResponseHandler.java:146)
> at org.apache.spark.network.server.TransportChannelHandler.
> channelInactive(TransportChannelHandler.java:108)
> at io.netty.channel.AbstractChannelHandlerContext.
> invokeChannelInactive(AbstractChannelHandlerContext.java:241)
> at io.netty.channel.AbstractChannelHandlerContext.
> invokeChannelInactive(AbstractChannelHandlerContext.java:227)
> at io.netty.channel.AbstractChannelHandlerContext.
> fireChannelInactive(AbstractChannelHandlerContext.java:220)
> at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(
> ChannelInboundHandlerAdapter.java:75)
> at io.netty.handler.timeout.IdleStateHandler.channelInactive(
> IdleStateHandler.java:278)
> at io.netty.channel.AbstractChannelHandlerContext.
> invokeChannelInactive(AbstractChannelHandlerContext.java:241)
> at io.netty.channel.AbstractChannelHandlerContext.
> invokeChannelInactive(AbstractChannelHandlerContext.java:227)
> at io.netty.channel.AbstractChannelHandlerContext.
> fireChannelInactive(AbstractChannelHandlerContext.java:220)
> at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(
> ChannelInboundHandlerAdapter.java:75)
> at io.netty.channel.AbstractChannelHandlerContext.
> invokeChannelInactive(AbstractChannelHandlerContext.java:241)
> at io.netty.channel.AbstractChannelHandlerContext.
> invokeChannelInactive(AbstractChannelHandlerContext.java:227)
> at io.netty.channel.AbstractChannelHandlerContext.
> fireChannelInactive(AbstractChannelHandlerContext.java:220)
> at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(
> ChannelInboundHandlerAdapter.java:75)
>
> this fails tasks and stage for 

Re: More instances = slower Spark job

2017-09-28 Thread Vadim Semenov
Instead of having one job, you can try processing each file in a separate
job, but run multiple jobs in parallel within one SparkContext.
Something like this should work for you, it'll submit N jobs from the
driver, the jobs will run independently, but executors will dynamically
work on different jobs, so you'll utilize executors at full.

```
import org.apache.spark.sql.SparkSession

import scala.collection.parallel.ForkJoinTaskSupport

val spark: SparkSession
val files: Seq[String]
val filesParallelCollection = files.toParArray
val howManyFilesToProcessInParallel = math.min(50, files.length)

filesParallelCollection.tasksupport = new ForkJoinTaskSupport()(
  new
scala.concurrent.forkjoin.ForkJoinPool(howManyFilesToProcessInParallel)
)
filesParallelCollection.foreach(file => {
  spark.read.text(file).filter(…)…
})
```

On Thu, Sep 28, 2017 at 2:50 PM, Jeroen Miller 
wrote:

> More details on what I want to achieve. Maybe someone can suggest a
> course of action.
>
> My processing is extremely simple: reading .json.gz text
> files, filtering each line according a regex, and saving the surviving
> lines in a similarly named .gz file.
>
> Unfortunately changing the data format is impossible (we are dealing
> with terabytes here) so I need to process .gz files.
>
> Ideally, I would like to process a large number of such files in
> parallel, that is using n_e executors which would each take care of a
> fraction 1/n_e of all the files. The trouble is that I don't know how
> to process files in parallel without loading them in the driver first,
> which would result in a major bottleneck.
>
> Here was my naive approach in Scala-like pseudo-code:
>
> //
> // This happens on the driver
> //
> val files = List("s3://bckt/file-1.json.gz", ...,
> "s3://bckt/file-N.json.gz")
> val files_rdd = sc.parallelize(files, num_partitions)
> //
> // Now files_rdd (which only holds file names) is distributed across
> several executors
> // and/or nodes
> //
>
> files_rdd.foreach(
> //
> // It is my understanding that what is performed within the foreach
> method
> // will be parallelized on several executors / nodes
> //
> file => {
> //
> // This would happen on a given executor: a given input file
> is processed
> // entirely on a given executor
> //
> val lines = sc.read.text(file)
> val filtered_lines = lines.filter( // filter based on regex // )
> filtered_lines.write.option("compression",
> "gzip").text("a_filename_tbd")
> }
> )
>
> Unfortunately this is not possible since the Spark context sc is
> defined in the driver and cannot be shared.
>
> My problem would be entirely solved if I could manage to read files
> not from the driver, but from a given executor.
>
> Another possibility would be to read each .gz file in the driver
> (about 2GB each), materializing the whole resulting RDD on the driver
> (around 12GB) and then calling repartition on that RDD, but only the
> regex part would be parallelized, and the data shuffling will probably
> ruin the performance.
>
> Any idea?
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: What are factors need to Be considered when upgrading to Spark 2.1.0 from Spark 1.6.0

2017-09-22 Thread Vadim Semenov
1. 40s is pretty negligible unless you run your job very frequently, there
can be many factors that influence that.

2. Try to compare the CPU time instead of the wall-clock time

3. Check the stages that got slower and compare the DAGs

4. Test with dynamic allocation disabled

On Fri, Sep 22, 2017 at 2:39 PM, Gokula Krishnan D 
wrote:

> Hello All,
>
> Currently our Batch ETL Jobs are in Spark 1.6.0 and planning to upgrade
> into Spark 2.1.0.
>
> With minor code changes (like configuration and Spark Session.sc) able to
> execute the existing JOB into Spark 2.1.0.
>
> But noticed that JOB completion timings are much better in Spark 1.6.0 but
> no in Spark 2.1.0.
>
> For the instance, JOB A completed in 50s in Spark 1.6.0.
>
> And with the same input and JOB A completed in 1.5 mins in Spark 2.1.0.
>
> Is there any specific factor needs to be considered when switching to
> Spark 2.1.0 from Spark 1.6.0.
>
>
>
> Thanks & Regards,
> Gokula Krishnan* (Gokul)*
>


Re: SVD computation limit

2017-09-19 Thread Vadim Semenov
This may also be related to
https://issues.apache.org/jira/browse/SPARK-22033

On Tue, Sep 19, 2017 at 3:40 PM, Mark Bittmann  wrote:

> I've run into this before. The EigenValueDecomposition creates a Java
> Array with 2*k*n elements. The Java Array is indexed with a native integer
> type, so 2*k*n cannot exceed Integer.MAX_VALUE values.
>
> The array is created here:
> https://github.com/apache/spark/blob/master/mllib/src/main/
> scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala#L84
>
> If you remove the requirement that 2*k*n fail with java.lang.NegativeArraySizeException. More here on this issue
> here:
> https://issues.apache.org/jira/browse/SPARK-5656
>
> On Tue, Sep 19, 2017 at 9:49 AM, Alexander Ovcharenko <
> shurik@gmail.com> wrote:
>
>> Hello guys,
>>
>> While trying to compute SVD using computeSVD() function, i am getting the
>> following warning with the follow up exception:
>> 17/09/14 12:29:02 WARN RowMatrix: computing svd with k=49865 and
>> n=191077, please check necessity
>> IllegalArgumentException: u'requirement failed: k = 49865 and/or n =
>> 191077 are too large to compute an eigendecomposition'
>>
>> When I try to compute first 3000 singular values, I'm getting several
>> following warnings every second:
>> 17/09/14 13:43:38 WARN TaskSetManager: Stage 4802 contains a task of very
>> large size (135 KB). The maximum recommended task size is 100 KB.
>>
>> The matrix size is 49865 x 191077 and all the singular values are needed.
>>
>> Is there a way to lift that limit and be able to compute whatever number
>> of singular values?
>>
>> Thank you.
>>
>>
>>
>


Re: Configuration for unit testing and sql.shuffle.partitions

2017-09-18 Thread Vadim Semenov
you can create a Super class "FunSuiteWithSparkContext" that's going to
create a Spark sessions, Spark context, and SQLContext with all the desired
properties.
Then you add the class to all the relevant test suites, and that's pretty
much it.

The other option can be is to pass it as a VM parameter like
`-Dspark.driver.memory=2g -Xmx3G -Dspark.master=local[3]`

For example, if you run your tests with sbt:

```
SBT_OPTS="-Xmx3G -Dspark.driver.memory=1536m" sbt test
```

On Sat, Sep 16, 2017 at 2:54 PM, Femi Anthony  wrote:

> How are you specifying it, as an option to spark-submit ?
>
> On Sat, Sep 16, 2017 at 12:26 PM, Akhil Das  wrote:
>
>> spark.sql.shuffle.partitions is still used I believe. I can see it in the
>> code
>> 
>>  and
>> in the documentation page
>> 
>> .
>>
>> On Wed, Sep 13, 2017 at 4:46 AM, peay  wrote:
>>
>>> Hello,
>>>
>>> I am running unit tests with Spark DataFrames, and I am looking for
>>> configuration tweaks that would make tests faster. Usually, I use a
>>> local[2] or local[4] master.
>>>
>>> Something that has been bothering me is that most of my stages end up
>>> using 200 partitions, independently of whether I repartition the input.
>>> This seems a bit overkill for small unit tests that barely have 200 rows
>>> per DataFrame.
>>>
>>> spark.sql.shuffle.partitions used to control this I believe, but it
>>> seems to be gone and I could not find any information on what
>>> mechanism/setting replaces it or the corresponding JIRA.
>>>
>>> Has anyone experience to share on how to tune Spark best for very small
>>> local runs like that?
>>>
>>> Thanks!
>>>
>>>
>>
>>
>> --
>> Cheers!
>>
>>
>
>
> --
> http://www.femibyte.com/twiki5/bin/view/Tech/
> http://www.nextmatrix.com
> "Great spirits have always encountered violent opposition from mediocre
> minds." - Albert Einstein.
>


Re: Bizarre UI Behavior after migration

2017-09-10 Thread Vadim Semenov
Was checking mails I sent, and wanted to get back to this one in case
someone gets the same question.

We found out that the reason why we saw stages being complete without all
tasks complete is related to issues in the ListenerBus

We had to tune the event queue size, see this
https://issues.apache.org/jira/browse/SPARK-15703

and we had to disable `eventLog` completely in some cases because of this
https://issues.apache.org/jira/browse/SPARK-21460

Facebook did some improvements to that, which are discussed here and in the
related PRs https://issues.apache.org/jira/browse/SPARK-18838

You can also see them discussing that at the Spark Summit SF 2017
https://www.youtube.com/watch?v=5dga0UT4RI8





On Mon, May 22, 2017 at 8:35 PM, Miles Crawford <mil...@allenai.org> wrote:

> Well, what's happening here is that jobs become "un-finished" - they
> complete, and then later on pop back into the "Active" section showing a
> small number of complete/inprogress tasks.
>
> In my screenshot, Job #1 completed as normal, and then later on switched
> back to active with only 92 tasks... it never seems to change again, it's
> stuck in this frozen, active state.
>
>
> On Mon, May 22, 2017 at 12:50 PM, Vadim Semenov <
> vadim.seme...@datadoghq.com> wrote:
>
>> I believe it shows only the tasks that have actually being executed, if
>> there were tasks with no data, they don't get reported.
>>
>> I might be mistaken, if somebody has a good explanation, would also like
>> to hear.
>>
>> On Fri, May 19, 2017 at 5:45 PM, Miles Crawford <mil...@allenai.org>
>> wrote:
>>
>>> Hey ya'll,
>>>
>>> Trying to migrate from Spark 1.6.1 to 2.1.0.
>>>
>>> I use EMR, and launched a new cluster using EMR 5.5, which runs spark
>>> 2.1.0.
>>>
>>> I updated my dependencies, and fixed a few API changes related to
>>> accumulators, and presto! my application was running on the new cluster.
>>>
>>> But the application UI shows crazy output:
>>> https://www.dropbox.com/s/egtj1056qeudswj/sparkwut.png?dl=0
>>>
>>> The applications seem to complete successfully, but I was wondering if
>>> anyone has an idea of what might be going wrong?
>>>
>>> Thanks,
>>> -Miles
>>>
>>
>>
>


Re: Referencing YARN application id, YARN container hostname, Executor ID and YARN attempt for jobs running on Spark EMR 5.7.0 in log statements?

2017-08-29 Thread Vadim Semenov
Each java process for each of the executors has some environment variables
that you can used, for example:

> CONTAINER_ID=container_1503994094228_0054_01_13

The executor id gets passed as an argument to the process:

> /usr/lib/jvm/java-1.8.0/bin/java … --driver-url
spark://CoarseGrainedScheduler@:38151 *--executor-id 3 *--hostname ip-1…

And it gets printed out in the container log:

> 17/08/29 13:02:00 INFO Executor: Starting executor ID 3 on host …



On Mon, Aug 28, 2017 at 5:41 PM, Mikhailau, Alex <alex.mikhai...@mlb.com>
wrote:

> Thanks, Vadim. The issue is not access to logs. I am able to view them.
>
>
>
> I have cloudwatch logs agent push logs to elasticsearch and then into
> Kibana using json-event-layout for log4j output. I would like to also log
> applicationId, executorId, etc in those log statements for clarity. Is
> there an MDC way with spark or something other than to achieve this?
>
>
>
> Alex
>
>
>
> *From: *Vadim Semenov <vadim.seme...@datadoghq.com>
> *Date: *Monday, August 28, 2017 at 5:18 PM
> *To: *"Mikhailau, Alex" <alex.mikhai...@mlb.com>
> *Cc: *"user@spark.apache.org" <user@spark.apache.org>
> *Subject: *Re: Referencing YARN application id, YARN container hostname,
> Executor ID and YARN attempt for jobs running on Spark EMR 5.7.0 in log
> statements?
>
>
>
> When you create a EMR cluster you can specify a S3 path where logs will be
> saved after cluster, something like this:
>
>
>
> s3://bucket/j-18ASDKLJLAKSD/containers/application_
> 1494074597524_0001/container_1494074597524_0001_01_01/stderr.gz
>
>
>
> http://docs.aws.amazon.com/emr/latest/ManagementGuide/
> emr-manage-view-web-log-files.html
>
>
>
> On Mon, Aug 28, 2017 at 4:43 PM, Mikhailau, Alex <alex.mikhai...@mlb.com>
> wrote:
>
> Does anyone have a working solution for logging YARN application id, YARN
> container hostname, Executor ID and YARN attempt for jobs running on Spark
> EMR 5.7.0 in log statements? Are there specific ENV variables available or
> other workflow for doing that?
>
>
>
> Thank you
>
>
>
> Alex
>
>
>


Re: Referencing YARN application id, YARN container hostname, Executor ID and YARN attempt for jobs running on Spark EMR 5.7.0 in log statements?

2017-08-28 Thread Vadim Semenov
When you create a EMR cluster you can specify a S3 path where logs will be
saved after cluster, something like this:

s3://bucket/j-18ASDKLJLAKSD/containers/application_1494074597524_0001/container_1494074597524_0001_01_01/stderr.gz

http://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-manage-view-web-log-files.html

On Mon, Aug 28, 2017 at 4:43 PM, Mikhailau, Alex 
wrote:

> Does anyone have a working solution for logging YARN application id, YARN
> container hostname, Executor ID and YARN attempt for jobs running on Spark
> EMR 5.7.0 in log statements? Are there specific ENV variables available or
> other workflow for doing that?
>
>
>
> Thank you
>
>
>
> Alex
>


Re: Collecting Multiple Aggregation query result on one Column as collectAsMap

2017-08-28 Thread Vadim Semenov
I didn't tailor it to your needs, but this is what I can offer you, the
idea should be pretty clear

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{collect_list, struct}

val spark: SparkSession

import spark.implicits._

case class Input(
  a: Int,
  b: Long,
  c: Int,
  d: Int,
  e: String
)

case class Output(
  a: Int,
  b: Long,
  data: Seq[(Int, Int, String)]
)

val df = spark.createDataFrame(Seq(
  Input(1, 1, 1, 1, "a"),
  Input(1, 1, 1, 1, "b"),
  Input(1, 1, 1, 1, "c"),
  Input(1, 2, 3, 3, "d")
))

val dfOut = df.groupBy("a", "b")
  .agg(collect_list(struct($"c", $"d", $"e")))
  .queryExecution.toRdd.mapPartitions(_.map(r => {
val a = r.getInt(0)
val b = r.getLong(1)

val list = r.getArray(2)

Output(
  a,
  b,
  (0 until list.numElements()).map(i => {
val struct = list.getStruct(i, 3)
val c = struct.getInt(0)
val d = struct.getInt(1)
val e = struct.getString(2)

(c, d, e)
  })
)
  })).toDF()
dfOut.explain(extended = true)
dfOut.show()


On Mon, Aug 28, 2017 at 10:47 AM, Patrick  wrote:

> Hi
>
> I have two lists:
>
>
>- List one: contains names of columns on which I want to do aggregate
>operations.
>- List two: contains the aggregate operations on which I want to
>perform on each column eg ( min, max, mean)
>
> I am trying to use spark 2.0 dataset to achieve this. Spark provides an
> agg() where you can pass a Map  (of column name and
> respective aggregate operation ) as input, however I want to perform
> different aggregation operations on the same column of the data and want to
> collect the result in a Map where key is the aggregate
> operation and Value is the result on the particular column.  If i add
> different agg() to same column, the key gets updated with latest value.
>
> Also I dont find any collectAsMap() operation that returns map of
> aggregated column name as key and result as value. I get collectAsList()
> but i dont know the order in which those agg() operations are run so how do
> i match which list values corresponds to which agg operation.  I am able to
> see the result using .show() but How can i collect the result in this case ?
>
> Is it possible to do different aggregation on the same column in one
> Job(i.e only one collect operation) using agg() operation?
>
>
> Thanks in advance.
>
>


Re: [Spark Core] Is it possible to insert a function directly into the Logical Plan?

2017-08-14 Thread Vadim Semenov
Something like this, maybe?


import org.apache.spark.sql.Dataset
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.execution.LogicalRDD
import org.apache.spark.sql.catalyst.encoders.RowEncoder

val df: DataFrame = ???
val spark = df.sparkSession
val rddOfInternalRows = df.queryExecution.toRdd.mapPartitions(iter => {
  log.info("Test")
  iter
})
val attributes = df.schema.map(f =>
   AttributeReference(f.name, f.dataType, f.nullable, f.metadata)()
)
val logicalPlan = LogicalRDD(attributes, rddOfInternalRows)(spark)
val rowEncoder = RowEncoder(df.schema)
val resultingDataFrame = new Dataset[Row](spark, logicalPlan, rowEncoder)
resultingDataFrame

On Mon, Aug 14, 2017 at 2:15 PM, Lukas Bradley 
wrote:

> We have had issues with gathering status on long running jobs.  We have
> attempted to draw parallels between the Spark UI/Monitoring API and our
> code base.  Due to the separation between code and the execution plan, even
> having a guess as to where we are in the process is difficult.  The
> Job/Stage/Task information is too abstracted from our code to be easily
> digested by non Spark engineers on our team.
>
> Is there a "hook" to which I can attach a piece of code that is triggered
> when a point in the plan is reached?  This could be when a SQL command
> completes, or when a new DataSet is created, anything really...
>
> It seems Dataset.checkpoint() offers an excellent snapshot position during
> execution, but I'm concerned I'm short-circuiting the optimal execution of
> the full plan.  I really want these trigger functions to be completely
> independent of the actual processing itself.  I'm not looking to extract
> information from a Dataset, RDD, or anything else.  I essentially want to
> write independent output for status.
>
> If this doesn't exist, is there any desire on the dev team for me to
> investigate this feature?
>
> Thank you for any and all help.
>


Re: count exceed int.MaxValue

2017-08-08 Thread Vadim Semenov
Scala doesn't support ranges >= Int.MaxValue
https://github.com/scala/scala/blob/2.12.x/src/library/scala/collection/immutable/Range.scala?utf8=✓#L89

You can create two RDDs and unionize them:

scala> val rdd = sc.parallelize(1L to
Int.MaxValue.toLong).union(sc.parallelize(1L to Int.MaxValue.toLong))
rdd: org.apache.spark.rdd.RDD[Long] = UnionRDD[10] at union at :24

scala> rdd.count
[Stage 0:>  (0 + 4)
/ 8]


Also instead of creating the range on the driver, you can create your RDD
in parallel:

scala> :paste
// Entering paste mode (ctrl-D to finish)

val numberOfParts = 100
val numberOfElementsInEachPart = Int.MaxValue.toDouble / 100

val rdd = sc.parallelize(1 to numberOfParts).flatMap(partNum => {
  val begin = ((partNum - 1) * numberOfElementsInEachPart).toLong
  val end = (partNum * numberOfElementsInEachPart).toLong
  begin to end
})

// Exiting paste mode, now interpreting.

numberOfParts: Int = 100
numberOfElementsInEachPart: Double = 2.147483647E7
rdd: org.apache.spark.rdd.RDD[Long] = MapPartitionsRDD[15] at flatMap at
:31

scala> rdd.count
res10: Long = 2147483747

On Tue, Aug 8, 2017 at 1:26 PM, makoto  wrote:

> Hello,
> I'd like to count more than Int.MaxValue. But I encountered the following
> error.
>
> scala> val rdd = sc.parallelize(1L to Int.MaxValue*2.toLong)
> rdd: org.apache.spark.rdd.RDD[Long] = ParallelCollectionRDD[28] at
> parallelize at :24
>
> scala> rdd.count
> java.lang.IllegalArgumentException: More than Int.MaxValue elements.
>   at scala.collection.immutable.NumericRange$.check$1(
> NumericRange.scala:304)
>   at scala.collection.immutable.NumericRange$.count(
> NumericRange.scala:314)
>   at scala.collection.immutable.NumericRange.numRangeElements$
> lzycompute(NumericRange.scala:52)
>   at scala.collection.immutable.NumericRange.numRangeElements(
> NumericRange.scala:51)
>   at scala.collection.immutable.NumericRange.length(NumericRange.scala:54)
>   at org.apache.spark.rdd.ParallelCollectionRDD$.slice(
> ParallelCollectionRDD.scala:145)
>   at org.apache.spark.rdd.ParallelCollectionRDD.getPartitions(
> ParallelCollectionRDD.scala:97)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
>   at scala.Option.getOrElse(Option.scala:121)
>   at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
>   at org.apache.spark.rdd.RDD.count(RDD.scala:1158)
>   ... 48 elided
>
> How can I avoid the error ?
> A similar problem is as follows:
> scala> rdd.reduce((a,b)=> (a + b))
> java.lang.IllegalArgumentException: More than Int.MaxValue elements.
>   at scala.collection.immutable.NumericRange$.check$1(
> NumericRange.scala:304)
>   at scala.collection.immutable.NumericRange$.count(
> NumericRange.scala:314)
>   at scala.collection.immutable.NumericRange.numRangeElements$
> lzycompute(NumericRange.scala:52)
>   at scala.collection.immutable.NumericRange.numRangeElements(
> NumericRange.scala:51)
>   at scala.collection.immutable.NumericRange.length(NumericRange.scala:54)
>   at org.apache.spark.rdd.ParallelCollectionRDD$.slice(
> ParallelCollectionRDD.scala:145)
>   at org.apache.spark.rdd.ParallelCollectionRDD.getPartitions(
> ParallelCollectionRDD.scala:97)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
>   at scala.Option.getOrElse(Option.scala:121)
>   at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:2119)
>   at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1026)
>   at org.apache.spark.rdd.RDDOperationScope$.withScope(
> RDDOperationScope.scala:151)
>   at org.apache.spark.rdd.RDDOperationScope$.withScope(
> RDDOperationScope.scala:112)
>   at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
>   at org.apache.spark.rdd.RDD.reduce(RDD.scala:1008)
>   ... 48 elided
>
>
>


Re: How can i remove the need for calling cache

2017-08-02 Thread Vadim Semenov
So if you just save an RDD to HDFS via 'saveAsSequenceFile', you would have
to create a new RDD that reads that data, this way you'll avoid recomputing
the RDD but may lose time on saving/loading.

Exactly same thing happens in 'checkpoint', 'checkpoint' is just a
convenient method that gives you the same RDD back, basically.

However, if your job fails, there's no way to run a new job using already
'checkpointed' data from a previous failed run. That's where having a
custom check pointer helps.

Another note: you can not delete "checkpoint"ed data in the same job, you
need to delete it somehow else.

BTW, have you tried '.persist(StorageLevel.DISK_ONLY)'? It caches data to
local disk, making more space in JVM and letting you to avoid hdfs.

On Wednesday, August 2, 2017, Vadim Semenov <vadim.seme...@datadoghq.com>
wrote:

> `saveAsObjectFile` doesn't save the DAG, it acts as a typical action, so
> it just saves data to some destination.
>
> `cache/persist` allow you to cache data and keep the DAG in case of some
> executor that holds data goes down, so Spark would still be able to
> recalculate missing partitions
>
> `localCheckpoint` allows you to sacrifice fault-tolerance and truncate the
> DAG, so if some executor goes down, the job will fail, because it has
> already forgotten the DAG. https://github.com/apache/
> spark/blob/master/core/src/main/scala/org/apache/spark/
> rdd/RDD.scala#L1551-L1610
>
> and `checkpoint` allows you to save data to some shared storage and
> truncate the DAG, so if an executor goes down, the job will be able to take
> missing partitions from the place where it saved the RDD
> https://github.com/apache/spark/blob/master/core/src/
> main/scala/org/apache/spark/rdd/RDD.scala#L1533-L1549
>
> On Wed, Aug 2, 2017 at 7:20 PM, Suzen, Mehmet <su...@acm.org
> <javascript:_e(%7B%7D,'cvml','su...@acm.org');>> wrote:
>
>> On 3 August 2017 at 01:05, jeff saremi <jeffsar...@hotmail.com
>> <javascript:_e(%7B%7D,'cvml','jeffsar...@hotmail.com');>> wrote:
>> > Vadim:
>> >
>> > This is from the Mastering Spark book:
>> >
>> > "It is strongly recommended that a checkpointed RDD is persisted in
>> memory,
>> > otherwise saving it on a file will require recomputation."
>>
>> Is this really true? I had the impression that DAG will not be carried
>> out once RDD is serialized to an external file, so 'saveAsObjectFile'
>> saves DAG as well?
>>
>
>


Re: How can i remove the need for calling cache

2017-08-02 Thread Vadim Semenov
`saveAsObjectFile` doesn't save the DAG, it acts as a typical action, so it
just saves data to some destination.

`cache/persist` allow you to cache data and keep the DAG in case of some
executor that holds data goes down, so Spark would still be able to
recalculate missing partitions

`localCheckpoint` allows you to sacrifice fault-tolerance and truncate the
DAG, so if some executor goes down, the job will fail, because it has
already forgotten the DAG.
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1551-L1610

and `checkpoint` allows you to save data to some shared storage and
truncate the DAG, so if an executor goes down, the job will be able to take
missing partitions from the place where it saved the RDD
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1533-L1549

On Wed, Aug 2, 2017 at 7:20 PM, Suzen, Mehmet  wrote:

> On 3 August 2017 at 01:05, jeff saremi  wrote:
> > Vadim:
> >
> > This is from the Mastering Spark book:
> >
> > "It is strongly recommended that a checkpointed RDD is persisted in
> memory,
> > otherwise saving it on a file will require recomputation."
>
> Is this really true? I had the impression that DAG will not be carried
> out once RDD is serialized to an external file, so 'saveAsObjectFile'
> saves DAG as well?
>


Re: How can i remove the need for calling cache

2017-08-02 Thread Vadim Semenov
Also check the `RDD.checkpoint()` method

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1533-L1550

On Wed, Aug 2, 2017 at 8:46 PM, Vadim Semenov <vadim.seme...@datadoghq.com>
wrote:

> I'm not sure that "checkpointed" means the same thing in that sentence.
>
> You can run a simple test using `spark-shell`:
>
> sc.setCheckpointDir("/tmp/checkpoint")
> val rdd = sc.parallelize(1 to 10).map(x => {
>   Thread.sleep(1000)
>   x
> })
> rdd.checkpoint()
> rdd.foreach(println) // Will take 10 seconds
> rdd.foreach(println) // Will be instant, because the RDD is checkpointed
>
> On Wed, Aug 2, 2017 at 7:05 PM, jeff saremi <jeffsar...@hotmail.com>
> wrote:
>
>> Vadim:
>>
>> This is from the Mastering Spark book:
>>
>> *"It is strongly recommended that a checkpointed RDD is persisted in
>> memory, otherwise saving it on a file will require recomputation."*
>>
>>
>> To me that means checkpoint will not prevent the recomputation that i was
>> hoping for
>> --
>> *From:* Vadim Semenov <vadim.seme...@datadoghq.com>
>> *Sent:* Tuesday, August 1, 2017 12:05:17 PM
>> *To:* jeff saremi
>> *Cc:* user@spark.apache.org
>> *Subject:* Re: How can i remove the need for calling cache
>>
>> You can use `.checkpoint()`:
>> ```
>> val sc: SparkContext
>> sc.setCheckpointDir("hdfs:///tmp/checkpointDirectory")
>> myrdd.checkpoint()
>> val result1 = myrdd.map(op1(_))
>> result1.count() // Will save `myrdd` to HDFS and do map(op1…
>> val result2 = myrdd.map(op2(_))
>> result2.count() // Will load `myrdd` from HDFS and do map(op2…
>> ```
>>
>> On Tue, Aug 1, 2017 at 2:05 PM, jeff saremi <jeffsar...@hotmail.com>
>> wrote:
>>
>>> Calling cache/persist fails all our jobs (i have  posted 2 threads on
>>> this).
>>>
>>> And we're giving up hope in finding a solution.
>>> So I'd like to find a workaround for that:
>>>
>>> If I save an RDD to hdfs and read it back, can I use it in more than one
>>> operation?
>>>
>>> Example: (using cache)
>>> // do a whole bunch of transformations on an RDD
>>>
>>> myrdd.cache()
>>>
>>> val result1 = myrdd.map(op1(_))
>>>
>>> val result2 = myrdd.map(op2(_))
>>>
>>> // in the above I am assuming that a call to cache will prevent all
>>> previous transformation from being calculated twice
>>>
>>> I'd like to somehow get result1 and result2 without duplicating work.
>>> How can I do that?
>>>
>>> thanks
>>>
>>> Jeff
>>>
>>
>>
>


Re: How can i remove the need for calling cache

2017-08-02 Thread Vadim Semenov
I'm not sure that "checkpointed" means the same thing in that sentence.

You can run a simple test using `spark-shell`:

sc.setCheckpointDir("/tmp/checkpoint")
val rdd = sc.parallelize(1 to 10).map(x => {
  Thread.sleep(1000)
  x
})
rdd.checkpoint()
rdd.foreach(println) // Will take 10 seconds
rdd.foreach(println) // Will be instant, because the RDD is checkpointed

On Wed, Aug 2, 2017 at 7:05 PM, jeff saremi <jeffsar...@hotmail.com> wrote:

> Vadim:
>
> This is from the Mastering Spark book:
>
> *"It is strongly recommended that a checkpointed RDD is persisted in
> memory, otherwise saving it on a file will require recomputation."*
>
>
> To me that means checkpoint will not prevent the recomputation that i was
> hoping for
> --
> *From:* Vadim Semenov <vadim.seme...@datadoghq.com>
> *Sent:* Tuesday, August 1, 2017 12:05:17 PM
> *To:* jeff saremi
> *Cc:* user@spark.apache.org
> *Subject:* Re: How can i remove the need for calling cache
>
> You can use `.checkpoint()`:
> ```
> val sc: SparkContext
> sc.setCheckpointDir("hdfs:///tmp/checkpointDirectory")
> myrdd.checkpoint()
> val result1 = myrdd.map(op1(_))
> result1.count() // Will save `myrdd` to HDFS and do map(op1…
> val result2 = myrdd.map(op2(_))
> result2.count() // Will load `myrdd` from HDFS and do map(op2…
> ```
>
> On Tue, Aug 1, 2017 at 2:05 PM, jeff saremi <jeffsar...@hotmail.com>
> wrote:
>
>> Calling cache/persist fails all our jobs (i have  posted 2 threads on
>> this).
>>
>> And we're giving up hope in finding a solution.
>> So I'd like to find a workaround for that:
>>
>> If I save an RDD to hdfs and read it back, can I use it in more than one
>> operation?
>>
>> Example: (using cache)
>> // do a whole bunch of transformations on an RDD
>>
>> myrdd.cache()
>>
>> val result1 = myrdd.map(op1(_))
>>
>> val result2 = myrdd.map(op2(_))
>>
>> // in the above I am assuming that a call to cache will prevent all
>> previous transformation from being calculated twice
>>
>> I'd like to somehow get result1 and result2 without duplicating work. How
>> can I do that?
>>
>> thanks
>>
>> Jeff
>>
>
>


Re: How can i remove the need for calling cache

2017-08-01 Thread Vadim Semenov
You can use `.checkpoint()`:
```
val sc: SparkContext
sc.setCheckpointDir("hdfs:///tmp/checkpointDirectory")
myrdd.checkpoint()
val result1 = myrdd.map(op1(_))
result1.count() // Will save `myrdd` to HDFS and do map(op1…
val result2 = myrdd.map(op2(_))
result2.count() // Will load `myrdd` from HDFS and do map(op2…
```

On Tue, Aug 1, 2017 at 2:05 PM, jeff saremi  wrote:

> Calling cache/persist fails all our jobs (i have  posted 2 threads on
> this).
>
> And we're giving up hope in finding a solution.
> So I'd like to find a workaround for that:
>
> If I save an RDD to hdfs and read it back, can I use it in more than one
> operation?
>
> Example: (using cache)
> // do a whole bunch of transformations on an RDD
>
> myrdd.cache()
>
> val result1 = myrdd.map(op1(_))
>
> val result2 = myrdd.map(op2(_))
>
> // in the above I am assuming that a call to cache will prevent all
> previous transformation from being calculated twice
>
> I'd like to somehow get result1 and result2 without duplicating work. How
> can I do that?
>
> thanks
>
> Jeff
>


Re: How to insert a dataframe as a static partition to a partitioned table

2017-07-20 Thread Vadim Semenov
This should work:
```
ALTER TABLE `table` ADD PARTITION (partcol=1) LOCATION
'/path/to/your/dataset'
```

On Wed, Jul 19, 2017 at 6:13 PM, ctang  wrote:

> I wonder if there are any easy ways (or APIs) to insert a dataframe (or
> DataSet), which does not contain the partition columns, as a static
> partition to the table. For example,
> The DataSet with columns (col1, col2) will be inserted into a table (col1,
> col2) partitioned by column partcol as a static partition with partspec
> (partcol =1).
>
> Thanks
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/How-to-insert-a-dataframe-as-a-
> static-partition-to-a-partitioned-table-tp28882.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: underlying checkpoint

2017-07-13 Thread Vadim Semenov
You need to trigger an action on that rdd to checkpoint it.

```
scala>spark.sparkContext.setCheckpointDir(".")

scala>val df = spark.createDataFrame(List(("Scala", 35), ("Python",
30), ("R", 15), ("Java", 20)))
df: org.apache.spark.sql.DataFrame = [_1: string, _2: int]

scala> df.rdd.checkpoint()

scala> df.rdd.isCheckpointed
res2: Boolean = false

scala> df.show()
+--+---+
|_1| _2|
+--+---+
| Scala| 35|
|Python| 30|
| R| 15|
|  Java| 20|
+--+---+


scala> df.rdd.isCheckpointed
res4: Boolean = false

scala> df.rdd.count()
res5: Long = 4

scala> df.rdd.isCheckpointed
res6: Boolean = true
```

On Thu, Jul 13, 2017 at 11:35 AM, Bernard Jesop 
wrote:

> Hi everyone, I just tried this simple program :
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> * import
> org.apache.spark.sql.SparkSession
>  object CheckpointTest extends App
> {
>val spark =
> SparkSession
>
> .builder()
>
> .appName("Toto")
>
> .getOrCreate()
>
> spark.sparkContext.setCheckpointDir(".")
>val df = spark.createDataFrame(List(("Scala", 35), ("Python", 30), ("R",
> 15), ("Java",
> 20)))
>
> df.show()
>
> df.rdd.checkpoint()
>println(if (df.rdd.isCheckpointed) "checkpointed" else "not
> checkpointed")
>  }*
> But the result is still *"not checkpointed"*.
> Do you have any idea why? (knowing that the checkpoint file is created)
>
> Best regards,
> Bernard JESOP
>


Re: Spark, S3A, and 503 SlowDown / rate limit issues

2017-07-05 Thread Vadim Semenov
Are you sure that you use S3A?
Because EMR says that they do not support S3A

https://aws.amazon.com/premiumsupport/knowledge-center/emr-file-system-s3/
> Amazon EMR does not currently support use of the Apache Hadoop S3A file
system.

I think that the HEAD requests come from the `createBucketIfNotExists` in
the AWS S3 library that checks if the bucket exists every time you do a PUT
request, i.e. creates a HEAD request.

You can disable that by setting `fs.s3.buckets.create.enabled` to `false`
http://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-upload-s3.html

On Thu, Jun 29, 2017 at 4:56 PM, Everett Anderson 
wrote:

> Hi,
>
> We're using Spark 2.0.2 + Hadoop 2.7.3 on AWS EMR with S3A for direct I/O
> from/to S3 from our Spark jobs. We set mapreduce.
> fileoutputcommitter.algorithm.version=2 and are using encrypted S3
> buckets.
>
> This has been working fine for us, but perhaps as we've been running more
> jobs in parallel, we've started getting errors like
>
> Status Code: 503, AWS Service: Amazon S3, AWS Request ID: ..., AWS Error
> Code: SlowDown, AWS Error Message: Please reduce your request rate., S3
> Extended Request ID: ...
>
> We enabled CloudWatch S3 request metrics for one of our buckets and I was
> a little alarmed to see spikes of over 800k S3 requests over a minute or
> so, with the bulk of them HEAD requests.
>
> We read and write Parquet files, and most tables have around 50
> shards/parts, though some have up to 200. I imagine there's additional
> parallelism when reading a shard in Parquet, though.
>
> Has anyone else encountered this? How did you solve it?
>
> I'd sure prefer to avoid copying all our data in and out of HDFS for each
> job, if possible.
>
> Thanks!
>
>


Re: How does HashPartitioner distribute data in Spark?

2017-06-23 Thread Vadim Semenov
This is the code that chooses the partition for a key:
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Partitioner.scala#L85-L88

it's basically `math.abs(key.hashCode % numberOfPartitions)`

On Fri, Jun 23, 2017 at 3:42 AM, Vikash Pareek <
vikash.par...@infoobjects.com> wrote:

> I am trying to understand how spark partitoing works.
>
> To understand this I have following piece of code on spark 1.6
>
> def countByPartition1(rdd: RDD[(String, Int)]) = {
> rdd.mapPartitions(iter => Iterator(iter.length))
> }
> def countByPartition2(rdd: RDD[String]) = {
> rdd.mapPartitions(iter => Iterator(iter.length))
> }
>
> //RDDs Creation
> val rdd1 = sc.parallelize(Array(("aa", 1), ("aa", 1), ("aa", 1), ("aa",
> 1)), 8)
> countByPartition(rdd1).collect()
> >> Array[Int] = Array(0, 1, 0, 1, 0, 1, 0, 1)
>
> val rdd2 = sc.parallelize(Array("aa", "aa", "aa", "aa"), 8)
> countByPartition(rdd2).collect()
> >> Array[Int] = Array(0, 1, 0, 1, 0, 1, 0, 1)
>
> In both the cases data is distributed uniformaly.
> I do have following questions on the basis of above observation:
>
>  1. In case of rdd1, hash partitioning should calculate hashcode of key
> (i.e. "aa" in this case), so all records should go to single partition
> instead of uniform distribution?
>  2. In case of rdd2, there is no key value pair so how hash partitoning
> going to work i.e. what is the key to calculate hashcode?
>
> I have followed @zero323 answer but not getting answer of these.
> https://stackoverflow.com/questions/31424396/how-does-hashpartitioner-work
>
>
>
>
> -
>
> __Vikash Pareek
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/How-does-HashPartitioner-distribute-data-in-Spark-
> tp28785.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: "Sharing" dataframes...

2017-06-20 Thread Vadim Semenov
You can launch one permanent spark context and then execute your jobs
within the context. And since they'll be running in the same context, they
can share data easily.

These two projects provide the functionality that you need:
https://github.com/spark-jobserver/spark-jobserver#persistent-context-mode---faster--required-for-related-jobs
https://github.com/cloudera/livy#post-sessions

On Tue, Jun 20, 2017 at 1:46 PM, Jean Georges Perrin  wrote:

> Hey,
>
> Here is my need: program A does something on a set of data and produces
> results, program B does that on another set, and finally, program C
> combines the data of A and B. Of course, the easy way is to dump all on
> disk after A and B are done, but I wanted to avoid this.
>
> I was thinking of creating a temp view, but I do not really like the temp
> aspect of it ;). Any idea (they are all worth sharing)
>
> jg
>
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: [How-To] Custom file format as source

2017-06-12 Thread Vadim Semenov
It should be easy to start with a custom Hadoop InputFormat that reads the
file and creates a `RDD[Row]`, since you know the records size, it should
be pretty easy to make the InputFormat to produce splits, so then you could
read the file in parallel.

On Mon, Jun 12, 2017 at 6:01 AM, OBones  wrote:

> Hello,
>
> I have an application here that generates data files in a custom binary
> format that provides the following information:
>
> Column list, each column has a data type (64 bit integer, 32 bit string
> index, 64 bit IEEE float, 1 byte boolean)
> Catalogs that give modalities for some columns (ie, column 1 contains only
> the following values: A, B, C, D)
> Array for actual data, each row has a fixed size according to the columns.
>
> Here is an example:
>
> Col1, 64bit integer
> Col2, 32bit string index
> Col3, 64bit integer
> Col4, 64bit float
>
> Catalog for Col1 = 10, 20, 30, 40, 50
> Catalog for Col2 = Big, Small, Large, Tall
> Catalog for Col3 = 101, 102, 103, 500, 5000
> Catalog for Col4 = (no catalog)
>
> Data array =
> 8 bytes, 4 bytes, 8 bytes, 8 bytes,
> 8 bytes, 4 bytes, 8 bytes, 8 bytes,
> 8 bytes, 4 bytes, 8 bytes, 8 bytes,
> 8 bytes, 4 bytes, 8 bytes, 8 bytes,
> 8 bytes, 4 bytes, 8 bytes, 8 bytes,
> ...
>
> I would like to use this kind of file as a source for various ML related
> computations (CART, RandomForrest, Gradient boosting...) and Spark is very
> interesting in this area.
> However, I'm a bit lost as to what I should write to have Spark use that
> file format as a source for its computation. Considering that those files
> are quite big (100 million lines, hundreds of gigs on disk), I'd rather not
> create something that writes a new file in a built-in format, but I'd
> rather write some code that makes Spark accept the file as it is.
>
> I looked around and saw the textfile method but it is not applicable to my
> case. I also saw the spark.read.format("libsvm") syntax which tells me that
> there is a list of supported formats known to spark, which I believe are
> called Dataframes, but I could not find any tutorial on this subject.
>
> Would you have any suggestion or links to documentation that would get me
> started?
>
> Regards,
> Olivier
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: [Spark Core] Does spark support read from remote Hive server via JDBC

2017-06-08 Thread Vadim Semenov
Have you tried running a query? something like:

```
test.select("*").limit(10).show()
```

On Thu, Jun 8, 2017 at 4:16 AM, Даша Ковальчук 
wrote:

> Hi guys,
>
> I need to execute hive queries on remote hive server from spark, but for
> some reasons i receive only column names(without data).
> Data available in table, I checked it via HUE and java jdbc connection.
>
> Here is my code example:
> val test = spark.read
> .option("url", "jdbc:hive2://remote.hive.server:1/work_base")
> .option("user", "user")
> .option("password", "password")
> .option("dbtable", "some_table_with_data")
> .option("driver", "org.apache.hive.jdbc.HiveDriver")
> .format("jdbc")
> .load()
> test.show()
>
>
> Scala version: 2.11
> Spark version: 2.1.0, i also tried 2.1.1
> Hive version: CDH 5.7 Hive 1.1.1
> Hive JDBC version: 1.1.1
>
> But this problem available on Hive with later versions, too.
> I didn't find anything in mail group answers and StackOverflow.
> Could you, please, help me with this issue or could you help me find correct
> solution how to query remote hive from spark?
>
> Thanks in advance!
>


Re: Bizarre UI Behavior after migration

2017-05-22 Thread Vadim Semenov
I believe it shows only the tasks that have actually being executed, if
there were tasks with no data, they don't get reported.

I might be mistaken, if somebody has a good explanation, would also like to
hear.

On Fri, May 19, 2017 at 5:45 PM, Miles Crawford  wrote:

> Hey ya'll,
>
> Trying to migrate from Spark 1.6.1 to 2.1.0.
>
> I use EMR, and launched a new cluster using EMR 5.5, which runs spark
> 2.1.0.
>
> I updated my dependencies, and fixed a few API changes related to
> accumulators, and presto! my application was running on the new cluster.
>
> But the application UI shows crazy output:
> https://www.dropbox.com/s/egtj1056qeudswj/sparkwut.png?dl=0
>
> The applications seem to complete successfully, but I was wondering if
> anyone has an idea of what might be going wrong?
>
> Thanks,
> -Miles
>


Re: Spark <--> S3 flakiness

2017-05-11 Thread Vadim Semenov
Use the official mailing list archive

http://mail-archives.apache.org/mod_mbox/spark-user/201705.mbox/%3ccajyeq0gh1fbhbajb9gghognhqouogydba28lnn262hfzzgf...@mail.gmail.com%3e

On Thu, May 11, 2017 at 2:50 PM, lucas.g...@gmail.com 
wrote:

> Also, and this is unrelated to the actual question... Why don't these
> messages show up in the archive?
>
> http://apache-spark-user-list.1001560.n3.nabble.com/
>
> Ideally I'd want to post a link to our internal wiki for these questions,
> but can't find them in the archive.
>
> On 11 May 2017 at 07:16, lucas.g...@gmail.com 
> wrote:
>
>> Looks like this isn't viable in spark 2.0.0 (and greater I presume).  I'm
>> pretty sure I came across this blog and ignored it due to that.
>>
>> Any other thoughts?  The linked tickets in: https://issues.apache.org/
>> jira/browse/SPARK-10063 https://issues.apache.org/jira/brows
>> e/HADOOP-13786 https://issues.apache.org/jira/browse/HADOOP-9565 look
>> relevant too.
>>
>> On 10 May 2017 at 22:24, Miguel Morales  wrote:
>>
>>> Try using the DirectParquetOutputCommiter:
>>> http://dev.sortable.com/spark-directparquetoutputcommitter/
>>>
>>> On Wed, May 10, 2017 at 10:07 PM, lucas.g...@gmail.com
>>>  wrote:
>>> > Hi users, we have a bunch of pyspark jobs that are using S3 for
>>> loading /
>>> > intermediate steps and final output of parquet files.
>>> >
>>> > We're running into the following issues on a semi regular basis:
>>> > * These are intermittent errors, IE we have about 300 jobs that run
>>> > nightly... And a fairly random but small-ish percentage of them fail
>>> with
>>> > the following classes of errors.
>>> >
>>> > S3 write errors
>>> >
>>> >> "ERROR Utils: Aborting task
>>> >> com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 404,
>>> AWS
>>> >> Service: Amazon S3, AWS Request ID: 2D3RP, AWS Error Code: null, AWS
>>> Error
>>> >> Message: Not Found, S3 Extended Request ID: BlaBlahEtc="
>>> >
>>> >
>>> >>
>>> >> "Py4JJavaError: An error occurred while calling o43.parquet.
>>> >> : com.amazonaws.services.s3.model.MultiObjectDeleteException: Status
>>> Code:
>>> >> 0, AWS Service: null, AWS Request ID: null, AWS Error Code: null, AWS
>>> Error
>>> >> Message: One or more objects could not be deleted, S3 Extended
>>> Request ID:
>>> >> null"
>>> >
>>> >
>>> >
>>> > S3 Read Errors:
>>> >
>>> >> [Stage 1:=>
>>>  (27 + 4)
>>> >> / 31]17/05/10 16:25:23 ERROR Executor: Exception in task 10.0 in
>>> stage 1.0
>>> >> (TID 11)
>>> >> java.net.SocketException: Connection reset
>>> >> at java.net.SocketInputStream.read(SocketInputStream.java:196)
>>> >> at java.net.SocketInputStream.read(SocketInputStream.java:122)
>>> >> at sun.security.ssl.InputRecord.readFully(InputRecord.java:442)
>>> >> at sun.security.ssl.InputRecord.readV3Record(InputRecord.java:554)
>>> >> at sun.security.ssl.InputRecord.read(InputRecord.java:509)
>>> >> at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927)
>>> >> at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.
>>> java:884)
>>> >> at sun.security.ssl.AppInputStream.read(AppInputStream.java:102)
>>> >> at
>>> >> org.apache.http.impl.io.AbstractSessionInputBuffer.read(Abst
>>> ractSessionInputBuffer.java:198)
>>> >> at
>>> >> org.apache.http.impl.io.ContentLengthInputStream.read(Conten
>>> tLengthInputStream.java:178)
>>> >> at
>>> >> org.apache.http.impl.io.ContentLengthInputStream.read(Conten
>>> tLengthInputStream.java:200)
>>> >> at
>>> >> org.apache.http.impl.io.ContentLengthInputStream.close(Conte
>>> ntLengthInputStream.java:103)
>>> >> at
>>> >> org.apache.http.conn.BasicManagedEntity.streamClosed(BasicMa
>>> nagedEntity.java:168)
>>> >> at
>>> >> org.apache.http.conn.EofSensorInputStream.checkClose(EofSens
>>> orInputStream.java:228)
>>> >> at
>>> >> org.apache.http.conn.EofSensorInputStream.close(EofSensorInp
>>> utStream.java:174)
>>> >> at java.io.FilterInputStream.close(FilterInputStream.java:181)
>>> >> at java.io.FilterInputStream.close(FilterInputStream.java:181)
>>> >> at java.io.FilterInputStream.close(FilterInputStream.java:181)
>>> >> at java.io.FilterInputStream.close(FilterInputStream.java:181)
>>> >> at com.amazonaws.services.s3.model.S3Object.close(S3Object.java:203)
>>> >> at org.apache.hadoop.fs.s3a.S3AInputStream.close(S3AInputStream
>>> .java:187)
>>> >
>>> >
>>> >
>>> > We have literally tons of logs we can add but it would make the email
>>> > unwieldy big.  If it would be helpful I'll drop them in a pastebin or
>>> > something.
>>> >
>>> > Our config is along the lines of:
>>> >
>>> > spark-2.1.0-bin-hadoop2.7
>>> > '--packages
>>> > com.amazonaws:aws-java-sdk:1.10.34,org.apache.hadoop:hadoop-aws:2.6.0
>>> > pyspark-shell'
>>> >
>>> > Given the stack overflow / googling I've been doing I know we're not
>>> the
>>> > only org with these issues but I haven't found a good set of solutions
>>> in
>>> 

Re: Looking at EMR Logs

2017-03-31 Thread Vadim Semenov
You can provide your own log directory, where Spark log will be saved, and
that you could replay afterwards.

Set in your job this: `spark.eventLog.dir=s3://bucket/some/directory` and
run it.
Note! The path `s3://bucket/some/directory` must exist before you run your
job, it'll not be created automatically.

The Spark HistoryServer on EMR won't show you anything because it's looking
for logs in `hdfs:///var/log/spark/apps` by default.

After that you can either copy the log files from s3 to the hdfs path
above, or you can copy them locally to `/tmp/spark-events` (the default
directory for spark logs) and run the history server like:
```
cd /usr/local/src/spark-1.6.1-bin-hadoop2.6
sbin/start-history-server.sh
```
and then open http://localhost:18080




On Thu, Mar 30, 2017 at 8:45 PM, Paul Tremblay 
wrote:

> I am looking for tips on evaluating my Spark job after it has run.
>
> I know that right now I can look at the history of jobs through the web
> ui. I also know how to look at the current resources being used by a
> similar web ui.
>
> However, I would like to look at the logs after the job is finished to
> evaluate such things as how many tasks were completed, how many executors
> were used, etc. I currently save my logs to S3.
>
> Thanks!
>
> Henry
>
> --
> Paul Henry Tremblay
> Robert Half Technology
>


Re: Launching multiple spark jobs within a main spark job.

2016-12-21 Thread Vadim Semenov
Check the source code for SparkLauncher:
https://github.com/apache/spark/blob/master/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java#L541

a separate process will be started using `spark-submit` and if it uses
`yarn-cluster` mode, a driver may be launched on another NodeManager or may
be launched on the same NodeManager, so you would need to work around it if
you want to avoid hot spots.

On Wed, Dec 21, 2016 at 8:19 AM, Naveen  wrote:

> Thanks Liang!
> I get your point. It would mean that when launching spark jobs, mode needs
> to be specified as client for all spark jobs.
> However, my concern is to know if driver's memory(which is launching spark
> jobs) will be used completely by the Future's(sparkcontext's) or these
> spawned sparkcontexts will get different nodes / executors from resource
> manager?
>
> On Wed, Dec 21, 2016 at 6:43 PM, Naveen  wrote:
>
>> Hi Sebastian,
>>
>> Yes, for fetching the details from Hive and HBase, I would want to use
>> Spark's HiveContext etc.
>> However, based on your point, I might have to check if JDBC based driver
>> connection could be used to do the same.
>>
>> Main reason for this is to avoid a client-server architecture design.
>>
>> If we go by a normal scala app without creating a sparkcontext as per
>> your suggestion, then
>> 1. it turns out to be a client program on cluster on a single node, and
>> for any multiple invocation through xyz scheduler , it will be invoked
>> always from that same node
>> 2. Having client program on a single data node might create a hotspot for
>> that data node which might create a bottleneck as all invocations might
>> create JVMs on that node itself.
>> 3. With above, we will loose the Spark on YARN's feature of dynamically
>> allocating a driver on any available data node through RM and NM
>> co-ordination. With YARN and Cluster mode of invoking a spark-job, it will
>> help distribute multiple application(main one) in cluster uniformly.
>>
>> Thanks and please let me know your views.
>>
>>
>> On Wed, Dec 21, 2016 at 5:43 PM, Sebastian Piu 
>> wrote:
>>
>>> Is there any reason you need a context on the application launching the
>>> jobs?
>>> You can use SparkLauncher in a normal app and just listen for state
>>> transitions
>>>
>>> On Wed, 21 Dec 2016, 11:44 Naveen,  wrote:
>>>
 Hi Team,

 Thanks for your responses.
 Let me give more details in a picture of how I am trying to launch jobs.

 Main spark job will launch other spark-job similar to calling multiple
 spark-submit within a Spark driver program.
 These spawned threads for new jobs will be totally different
 components, so these cannot be implemented using spark actions.

 sample code:

 -

 Object Mainsparkjob {

 main(...){

 val sc=new SparkContext(..)

 Fetch from hive..using hivecontext
 Fetch from hbase

 //spawning multiple Futures..
 Val future1=Future{
 Val sparkjob= SparkLauncher(...).launch; spark.waitFor
 }

 Similarly, future2 to futureN.

 future1.onComplete{...}
 }

 }// end of mainsparkjob
 --


 [image: Inline image 1]

 On Wed, Dec 21, 2016 at 3:13 PM, David Hodeffi <
 david.hode...@niceactimize.com> wrote:

 I am not familiar of any problem with that.

 Anyway, If you run spark applicaction you would have multiple jobs,
 which makes sense that it is not a problem.



 Thanks David.



 *From:* Naveen [mailto:hadoopst...@gmail.com]
 *Sent:* Wednesday, December 21, 2016 9:18 AM
 *To:* d...@spark.apache.org; user@spark.apache.org
 *Subject:* Launching multiple spark jobs within a main spark job.



 Hi Team,



 Is it ok to spawn multiple spark jobs within a main spark job, my main
 spark job's driver which was launched on yarn cluster, will do some
 preprocessing and based on it, it needs to launch multilple spark jobs on
 yarn cluster. Not sure if this right pattern.



 Please share your thoughts.

 Sample code i ve is as below for better understanding..

 -



 Object Mainsparkjob {



 main(...){



 val sc=new SparkContext(..)



 Fetch from hive..using hivecontext

 Fetch from hbase



 //spawning multiple Futures..

 Val future1=Future{

 Val sparkjob= SparkLauncher(...).launch; spark.waitFor

 }



 Similarly, future2 to futureN.



 future1.onComplete{...}

 }



 }// end of mainsparkjob

 --


 Confidentiality: This communication and any attachments are intended
 

Re: NoClassDefFoundError

2016-12-21 Thread Vadim Semenov
You better ask folks in the spark-jobserver gitter channel:
https://github.com/spark-jobserver/spark-jobserver

On Wed, Dec 21, 2016 at 8:02 AM, Reza zade  wrote:

> Hello
>
> I've extended the JavaSparkJob (job-server-0.6.2) and created an object
> of SQLContext class. my maven project doesn't have any problem during
> compile and packaging phase. but when I send .jar of project to sjs and run
> it "NoClassDefFoundError" will be issued. the trace of exception is :
>
>
> job-server[ERROR] Exception in thread "pool-20-thread-1"
> java.lang.NoClassDefFoundError: org/apache/spark/sql/SQLContext
> job-server[ERROR]  at sparkdesk.SparkSQLJob2.runJob(SparkSQLJob2.java:61)
> job-server[ERROR]  at sparkdesk.SparkSQLJob2.runJob(SparkSQLJob2.java:45)
> job-server[ERROR]  at spark.jobserver.JavaSparkJob.r
> unJob(JavaSparkJob.scala:17)
> job-server[ERROR]  at spark.jobserver.JavaSparkJob.r
> unJob(JavaSparkJob.scala:14)
> job-server[ERROR]  at spark.jobserver.JobManagerActo
> r$$anonfun$spark$jobserver$JobManagerActor$$getJobFuture$4.
> apply(JobManagerActor.scala:301)
> job-server[ERROR]  at scala.concurrent.impl.Future$P
> romiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> job-server[ERROR]  at scala.concurrent.impl.Future$P
> romiseCompletingRunnable.run(Future.scala:24)
> job-server[ERROR]  at java.util.concurrent.ThreadPoo
> lExecutor.runWorker(ThreadPoolExecutor.java:1145)
> job-server[ERROR]  at java.util.concurrent.ThreadPoo
> lExecutor$Worker.run(ThreadPoolExecutor.java:615)
> job-server[ERROR]  at java.lang.Thread.run(Thread.java:745)
> job-server[ERROR] Caused by: java.lang.ClassNotFoundException:
> org.apache.spark.sql.SQLContext
> job-server[ERROR]  at java.net.URLClassLoader$1.run(
> URLClassLoader.java:366)
> job-server[ERROR]  at java.net.URLClassLoader$1.run(
> URLClassLoader.java:355)
> job-server[ERROR]  at java.security.AccessController.doPrivileged(Native
> Method)
> job-server[ERROR]  at java.net.URLClassLoader.findCl
> ass(URLClassLoader.java:354)
> job-server[ERROR]  at java.lang.ClassLoader.loadClas
> s(ClassLoader.java:425)
> job-server[ERROR]  at java.lang.ClassLoader.loadClas
> s(ClassLoader.java:358)
> job-server[ERROR]  ... 10 more
>
>
> what is the problem?
> do you have any solution about this?
>


  1   2   >