intermittent Kryo serialization failures in Spark

2019-07-10 Thread Jerry Vinokurov
Hi all,

I am experiencing a strange intermittent failure of my Spark job that
results from serialization issues in Kryo. Here is the stack trace:

Caused by: java.lang.ClassNotFoundException: com.mycompany.models.MyModel
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>   at java.lang.Class.forName0(Native Method)
>   at java.lang.Class.forName(Class.java:348)
>   at 
> org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:132)
>   at 
> org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:132)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>   at 
> org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:132)
>   ... 204 more
>
> (I've edited the company and model name since this is proprietary code)

This error does not surface every time the job is run; I would say it
probably shows up once in every 10 runs or so, and there isn't anything
about the input data that triggers this, as I've been able to
(nondeterministically) reproduce the error by simply rerunning the job with
the same inputs over and over again. The model itself is just a plain Scala
case class whose fields are strings and integers, so there's no custom
serialization logic or anything like that. As I understand, this is seems
related to an issue previously documented here
but allegedly this was
fixed long ago. I'm running this job on an AWS EMR cluster and have
confirmed that the version of Spark running there is 2.4.0, with the patch
that is linked in the above issue being part of the code.

A suggested solution has been to set the extraClasspath config settings on
the driver and executor, but that has not fixed the problem. I'm out of
ideas for how to tackle this and would love to hear if anyone has any
suggestions or strategies for fixing this.

thanks,
Jerry

-- 
http://www.google.com/profiles/grapesmoker


Problems running TPC-H on Raspberry Pi Cluster

2019-07-10 Thread agg212
We are trying to benchmark TPC-H (scale factor 1) on a 13-node Raspberry Pi
3B+ cluster (1 master, 12 workers). Each node has 1GB of RAM and a quad-core
processor, running Ubuntu Server 18.04. The cluster is using the Spark
standalone scheduler with the *.tbl files from TPCH’s dbgen tool stored in
HDFS.

We are experiencing several failures when trying to run queries. Jobs fail
unpredictably, usually with one or many “DEAD/LOST” nodes displaying in the
web UI. It appears that one or more nodes “hang” during query execution and
become unreachable/timeout.

We have included our configuration parameters as well as the driver program
below. Any recommendations would be greatly appreciated

---

---



Driver:
---




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Parquet 'bucketBy' creates a ton of files

2019-07-10 Thread Silvio Fiorito
It really depends on the use case. Bucketing is storing the data already 
hash-partitioned. So, if you frequently perform aggregations or joins on the 
bucketing column(s) then it can save you a shuffle. You need to keep in mind 
that for joins to completely avoid a shuffle both tables would need to have the 
same bucketing.

Sorting the data may help with filtering assuming you’re using a file format 
like Parquet (e.g. if you frequently filter by account id). If you look at 
slide 11 in this talk I gave at Summit you can see a simple example: 
https://www.slideshare.net/databricks/lessons-from-the-field-episode-ii-applying-best-practices-to-your-apache-spark-applications-with-silvio-fiorito



From: Gourav Sengupta 
Date: Wednesday, July 10, 2019 at 3:14 AM
To: Silvio Fiorito 
Cc: Arwin Tio , "user@spark.apache.org" 

Subject: Re: Parquet 'bucketBy' creates a ton of files

yeah makes sense, also is there any massive performance improvement using 
bucketBy in comparison to sorting?

Regards,
Gourav

On Thu, Jul 4, 2019 at 1:34 PM Silvio Fiorito 
mailto:silvio.fior...@granturing.com>> wrote:
You need to first repartition (at a minimum by bucketColumn1) since each task 
will write out the buckets/files. If the bucket keys are distributed randomly 
across the RDD partitions, then you will get multiple files per bucket.

From: Arwin Tio mailto:arwin@hotmail.com>>
Date: Thursday, July 4, 2019 at 3:22 AM
To: "user@spark.apache.org" 
mailto:user@spark.apache.org>>
Subject: Parquet 'bucketBy' creates a ton of files

I am trying to use Spark's **bucketBy** feature on a pretty large dataset.

```java
dataframe.write()
.format("parquet")
.bucketBy(500, bucketColumn1, bucketColumn2)
.mode(SaveMode.Overwrite)
.option("path", "s3://my-bucket")
.saveAsTable("my_table");
```

The problem is that my Spark cluster has about 500 partitions/tasks/executors 
(not sure the terminology), so I end up with files that look like:

```
part-1-{UUID}_1.c000.snappy.parquet
part-1-{UUID}_2.c000.snappy.parquet
...
part-1-{UUID}_00500.c000.snappy.parquet

part-2-{UUID}_1.c000.snappy.parquet
part-2-{UUID}_2.c000.snappy.parquet
...
part-2-{UUID}_00500.c000.snappy.parquet

part-00500-{UUID}_1.c000.snappy.parquet
part-00500-{UUID}_2.c000.snappy.parquet
...
part-00500-{UUID}_00500.c000.snappy.parquet
```

That's 500x500=25 bucketed parquet files! It takes forever for the 
`FileOutputCommitter` to commit that to S3.

Is there a way to generate **one file per bucket**, like in Hive? Or is there a 
better way to deal with this problem? As of now it seems like I have to choose 
between lowering the parallelism of my cluster (reduce number of writers) or 
reducing the parallelism of my parquet files (reduce number of buckets), which 
will lower the parallelism of my downstream jobs.

Thanks


Re: Set TimeOut and continue with other tasks

2019-07-10 Thread Wei Chen
I am currently trying to use Future Await to set a timeout inside the
map-reduce.
However, the tasks now fail instead of stuck, even if I have a Try Match to
catch it.
Doesn't anyone have an idea why?

The code is like

```Scala
files.map { file =>
  Try {
def tmpFunc(): Boolean = { FILE CONVERTION ON HDFS }
val tmpFuture = Future[Boolean] { tmpFunc() }
Await.result(tmpFuture, 600 seconds)
  } match {
case Failure(e) => "F"
case Success(r) => "S"
  }
}
```

The converter is created in a lazy function in a broadcast object,
which shouldn't be a problem.

Best Regards
Wei


On Wed, Jul 10, 2019 at 3:16 PM Gourav Sengupta 
wrote:

> Is there a way you can identify those patterns in a file or in its name
> and then just tackle them in separate jobs? I use the function
> input_file_name() to find the name of input file of each record and then
> filter out certain files.
>
> Regards,
> Gourav
>
> On Wed, Jul 10, 2019 at 6:47 AM Wei Chen  wrote:
>
>> Hello All,
>>
>> I am using spark to process some files parallelly.
>> While most files are able to be processed within 3 seconds,
>> it is possible that we stuck on 1 or 2 files as they will never finish
>> (or will take more than 48 hours).
>> Since it is a 3rd party file conversion tool, we are not able to debug
>> why the converter stuck at the time.
>>
>> Is it possible that we set a timeout for our process, throw exceptions
>> for those tasks,
>> while still continue with other successful tasks?
>>
>> Best Regards
>> Wei
>>
>


Re: Spark structural streaming sinks output late

2019-07-10 Thread Magnus Nilsson
Well, you should get updates every 10 seconds as long as there are events
surviving your quite aggressive watermark condition. Spark will try to drop
(not guaranteed) all events with a timestamp more than 500 milliseconds
before the current watermark timestamp. Try to increase the watermark
timespan and collect the max("timestamp") besides count on every trigger to
see what's going on in your stream. Could be that you have one producer out
of sync (clock sync) adding one message every two minutes. That will make
you drop all the other messages when you run with such a low watermark
tolerance.

Regards,

Magnus



On Wed, Jul 10, 2019 at 9:20 AM Kamalanathan Venkatesan <
kamalanatha...@in.ey.com> wrote:

> Hello,
>
>
>
> Any observations on what am I doing wrong?
>
>
>
> Thanks,
>
> -Kamal
>
>
>
> *From:* Kamalanathan Venkatesan
> *Sent:* Tuesday, July 09, 2019 7:25 PM
> *To:* 'user@spark.apache.org' 
> *Subject:* Spark structural streaming sinks output late
>
>
>
> Hello,
>
>
>
> I have below spark structural streaming code and I was expecting the
> results to be printed on the console every 10 seconds. But, I notice the
> sink to console happening at every ~2 mins and above.
>
> May I know what am I doing wrong?
>
>
>
> *def* streaming(): Unit = {
>
> System.setProperty("hadoop.home.dir", "/Documents/ ")
>
> *val* conf: SparkConf = *new* SparkConf().setAppName("Histogram").
> setMaster("local[8]")
>
> conf.set("spark.eventLog.enabled", "false");
>
> *val* sc: SparkContext = *new* SparkContext(conf)
>
> *val* sqlcontext = *new* SQLContext(sc)
>
> *val* spark = SparkSession.builder().config(conf).getOrCreate()
>
>
>
> *import* sqlcontext.implicits._
>
> *import* org.apache.spark.sql.functions.window
>
>
>
> *val* inputDf = spark.readStream.format("kafka")
>
>   .option("kafka.bootstrap.servers", "localhost:9092")
>
>   .option("subscribe", "wonderful")
>
>   .option("startingOffsets", "latest")
>
>   .load()
>
> *import* scala.concurrent.duration._
>
>
>
> *val* personJsonDf = inputDf.selectExpr("CAST(key AS STRING)", "CAST(value
> AS STRING)", "timestamp")
>
>   .withWatermark("timestamp", "500 milliseconds")
>
>   .groupBy(
>
> window(*$**"timestamp"*, "10 seconds")).count()
>
>
>
> *val* consoleOutput = personJsonDf.writeStream
>
>   .outputMode("complete")
>
>   .format("console")
>
>   .option("truncate", "false")
>
>   .outputMode(OutputMode.Update())
>
>   .start()
>
> consoleOutput.awaitTermination()
>
>   }
>
>
>
> *object* SparkExecutor {
>
>   *val* spE: SparkExecutor = *new* SparkExecutor();
>
>   *def* main(args: Array[*String*]): Unit = {
>
> println("test")
>
> spE.streaming
>
>   }
>
> }
>
> The information contained in this communication is intended solely for the
> use of the individual or entity to whom it is addressed and others
> authorized to receive it. It may contain confidential or legally privileged
> information. If you are not the intended recipient you are hereby notified
> that any disclosure, copying, distribution or taking any action in reliance
> on the contents of this information is strictly prohibited and may be
> unlawful. If you have received this communication in error, please notify
> us immediately by responding to this email and then delete it from your
> system. The firm is neither liable for the proper and complete transmission
> of the information contained in this communication nor for any delay in its
> receipt.
>


RE: Spark structural streaming sinks output late

2019-07-10 Thread Kamalanathan Venkatesan
Hello,

Any observations on what am I doing wrong?

Thanks,
-Kamal

From: Kamalanathan Venkatesan
Sent: Tuesday, July 09, 2019 7:25 PM
To: 'user@spark.apache.org' 
Subject: Spark structural streaming sinks output late

Hello,

I have below spark structural streaming code and I was expecting the results to 
be printed on the console every 10 seconds. But, I notice the sink to console 
happening at every ~2 mins and above.
May I know what am I doing wrong?

def streaming(): Unit = {
System.setProperty("hadoop.home.dir", "/Documents/ ")
val conf: SparkConf = new 
SparkConf().setAppName("Histogram").setMaster("local[8]")
conf.set("spark.eventLog.enabled", "false");
val sc: SparkContext = new SparkContext(conf)
val sqlcontext = new SQLContext(sc)
val spark = SparkSession.builder().config(conf).getOrCreate()

import sqlcontext.implicits._
import org.apache.spark.sql.functions.window

val inputDf = spark.readStream.format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "wonderful")
  .option("startingOffsets", "latest")
  .load()
import scala.concurrent.duration._

val personJsonDf = inputDf.selectExpr("CAST(key AS STRING)", "CAST(value AS 
STRING)", "timestamp")
  .withWatermark("timestamp", "500 milliseconds")
  .groupBy(
window($"timestamp", "10 seconds")).count()

val consoleOutput = personJsonDf.writeStream
  .outputMode("complete")
  .format("console")
  .option("truncate", "false")
  .outputMode(OutputMode.Update())
  .start()
consoleOutput.awaitTermination()
  }

object SparkExecutor {
  val spE: SparkExecutor = new SparkExecutor();
  def main(args: Array[String]): Unit = {
println("test")
spE.streaming
  }
}

The information contained in this communication is intended solely for the use 
of the individual or entity to whom it is addressed and others authorized to 
receive it. It may contain confidential or legally privileged information. If 
you are not the intended recipient you are hereby notified that any disclosure, 
copying, distribution or taking any action in reliance on the contents of this 
information is strictly prohibited and may be unlawful. If you have received 
this communication in error, please notify us immediately by responding to this 
email and then delete it from your system. The firm is neither liable for the 
proper and complete transmission of the information contained in this 
communication nor for any delay in its receipt.


Re: Set TimeOut and continue with other tasks

2019-07-10 Thread Gourav Sengupta
Is there a way you can identify those patterns in a file or in its name and
then just tackle them in separate jobs? I use the function
input_file_name() to find the name of input file of each record and then
filter out certain files.

Regards,
Gourav

On Wed, Jul 10, 2019 at 6:47 AM Wei Chen  wrote:

> Hello All,
>
> I am using spark to process some files parallelly.
> While most files are able to be processed within 3 seconds,
> it is possible that we stuck on 1 or 2 files as they will never finish (or
> will take more than 48 hours).
> Since it is a 3rd party file conversion tool, we are not able to debug why
> the converter stuck at the time.
>
> Is it possible that we set a timeout for our process, throw exceptions for
> those tasks,
> while still continue with other successful tasks?
>
> Best Regards
> Wei
>


Re: Parquet 'bucketBy' creates a ton of files

2019-07-10 Thread Gourav Sengupta
yeah makes sense, also is there any massive performance improvement using
bucketBy in comparison to sorting?

Regards,
Gourav

On Thu, Jul 4, 2019 at 1:34 PM Silvio Fiorito 
wrote:

> You need to first repartition (at a minimum by bucketColumn1) since each
> task will write out the buckets/files. If the bucket keys are distributed
> randomly across the RDD partitions, then you will get multiple files per
> bucket.
>
>
>
> *From: *Arwin Tio 
> *Date: *Thursday, July 4, 2019 at 3:22 AM
> *To: *"user@spark.apache.org" 
> *Subject: *Parquet 'bucketBy' creates a ton of files
>
>
>
> I am trying to use Spark's **bucketBy** feature on a pretty large dataset.
>
>
>
> ```java
>
> dataframe.write()
>
> .format("parquet")
>
> .bucketBy(500, bucketColumn1, bucketColumn2)
>
> .mode(SaveMode.Overwrite)
>
> .option("path", "s3://my-bucket")
>
> .saveAsTable("my_table");
>
> ```
>
>
>
> The problem is that my Spark cluster has about 500
> partitions/tasks/executors (not sure the terminology), so I end up with
> files that look like:
>
>
>
> ```
>
> part-1-{UUID}_1.c000.snappy.parquet
>
> part-1-{UUID}_2.c000.snappy.parquet
>
> ...
>
> part-1-{UUID}_00500.c000.snappy.parquet
>
>
>
> part-2-{UUID}_1.c000.snappy.parquet
>
> part-2-{UUID}_2.c000.snappy.parquet
>
> ...
>
> part-2-{UUID}_00500.c000.snappy.parquet
>
>
>
> part-00500-{UUID}_1.c000.snappy.parquet
>
> part-00500-{UUID}_2.c000.snappy.parquet
>
> ...
>
> part-00500-{UUID}_00500.c000.snappy.parquet
>
> ```
>
>
>
> That's 500x500=25 bucketed parquet files! It takes forever for the
> `FileOutputCommitter` to commit that to S3.
>
>
>
> Is there a way to generate **one file per bucket**, like in Hive? Or is
> there a better way to deal with this problem? As of now it seems like I
> have to choose between lowering the parallelism of my cluster (reduce
> number of writers) or reducing the parallelism of my parquet files (reduce
> number of buckets), which will lower the parallelism of my downstream jobs.
>
>
>
> Thanks
>


Re: Pass row to UDF and select column based on pattern match

2019-07-10 Thread Gourav Sengupta
Just out of curiosity why are you trying to use a UDF when the
corresponding function is available? Or why not use SQL instead?

Regards,
Gourav

On Tue, Jul 9, 2019 at 7:25 PM Femi Anthony  wrote:

> How can I achieve the following by passing a row to a udf ?
>
>
> *val df1 = df.withColumn("col_Z", *
>
> *  when($"col_x" === "a", $"col_A")*
>
> *  .when($"col_x" === "b", $"col_B")*
>
> *  .when($"col_x" === "c", $"col_C")*
>
> *  .when($"col_x" === "d", $"col_D")*
>
> *  .when($"col_x" === "e", $"col_E")*
>
> *  .when($"col_x" === "f", $"col_F")*
>
> *  .when($"col_x" === "g", $"col_G")*
>
> *   )*
>
>
> As I understand it, only columns can be passed as arguments to a UDF in
> Scala Spark.
>
>
> I have taken a look at this question:
>
>
>
> https://stackoverflow.com/questions/31816975/how-to-pass-whole-row-to-udf-spark-dataframe-filter
>
>
> and tried to implement this udf:
>
>
> *def myUDF(r:Row) = udf {*
>
>
>
> * val z : Float = r.getAs("col_x") match {*
>
> *  case "a" => r.getAs("col_A")*
>
> *  case "b" => r.getAs("col_B")*
>
> *  case other => lit(0.0)*
>
> *   }*
>
> * z*
>
> *}*
>
>
> but I'm getting a type mismatch error:
>
>
>
>  *error: type mismatch;*
>
> * found   : String("a")*
>
> * required: Nothing*
>
> * case "a" => r.getAs("col_A")*
>
> *  ^*
>
>
> What am I doing wrong ?
>
>
>
> Sent from my iPhone
>