RE: Jars directory in Spark 2.0

2017-01-31 Thread Sidney Feiner
Is this done on purpose? Because it really makes it hard to deploy 
applications. Is there a reason they didn't shade the jars they use to begin 
with?

Sidney Feiner   /  SW Developer
M: +972.528197720  /  Skype: sidney.feiner.startapp

[StartApp]

From: Koert Kuipers [mailto:ko...@tresata.com]
Sent: Tuesday, January 31, 2017 7:26 PM
To: Sidney Feiner 
Cc: user@spark.apache.org
Subject: Re: Jars directory in Spark 2.0

you basically have to keep your versions of dependencies in line with sparks or 
shade your own dependencies.

you cannot just replace the jars in sparks jars folder. if you wan to update 
them you have to build spark yourself with updated dependencies and confirm it 
compiles, passes tests etc.

On Tue, Jan 31, 2017 at 3:40 AM, Sidney Feiner 
> wrote:
Hey,
While migrating to Spark 2.X from 1.6, I've had many issues with jars that come 
preloaded with Spark in the "jars/" directory and I had to shade most of my 
packages.
Can I replace the jars in this folder to more up to date versions? Are those 
jar used for anything internal in Spark which means I can't blindly replace 
them?

Thanks ☺


Sidney Feiner   /  SW Developer
M: +972.528197720  /  Skype: sidney.feiner.startapp

[StartApp]


 


Hive Java UDF running on spark-sql issue

2017-01-31 Thread Alex
Hi ,


we have Java Hive UDFS which are working perfectly fine in Hive

SO for Better performance we are migrating the same To Spark-sql

SO these jar files we are giving --jars argument to spark-sql
and defining temporary functions to make it to run on spark-sql

there is this particular Java UDF which is working fine on hive But when
ran on spark-sql it is giving the error

Caused by:org.apache.hadoop.hive.ql.metadata.HiveException:
java.lang.ClassCastException: java.lang.Long cannot be cast to
org.apache.hadoop.io.LongWritable
org.apache.hadoop.hive.ql.metadata.HiveException:
java.lang.ClassCastException: java.lang.String cannot be cast to
org.apache.hadoop.io.Text
Caused by:org.apache.hadoop.hive.ql.metadata.HiveException:
java.lang.ClassCastException: java.lang.Double cannot be cast to
org.apache.hadoop.hive.serde2.io.DoubleWritable

The piece of code where it is throwing the error is in teh switch case below

public String getName(int pos) {
if (pos < 0 && pos >= colnames.size())
return null;
return ((StructField) colnames.get(pos)).getFieldName();
}

public int getPos(String name) {
// System.out.println(name+transactionObject.toString());
Integer pos = (Integer) transactionObject.get(name.toLowerCase());
if (pos == null)
return -1;
return pos;
}

public Object get(Object name) {
int pos = getPos((String) name);
if (pos < 0)
return null;
String f = "string";
Object obj = list.get(pos);
if (obj == null)
return null;
ObjectInspector ins = ((StructField)
colnames.get(pos)).getFieldObjectInspector();
if (ins != null)
f = ins.getTypeName();
switch (f) {
case "double":
return ((DoubleWritable) obj).get();
case "bigint":
return ((Long) obj).get();
case "string":
return ((Text) obj).toString();
default:
return obj;
}
}

So I made the code change to below

public int getPos(String name) {
// System.out.println(name+transactionObject.toString());
Integer pos = (Integer) transactionObject.get(name.toLowerCase());
if (pos == null)
return -1;
return pos;
}

public Object get(Object name) {
int pos = getPos((String) name);
if (pos < 0)
return null;
String f = "string";
Object obj = list.get(pos);
Object result = null;
if (obj == null)
return null;
ObjectInspector ins = ((StructField)
colnames.get(pos)).getFieldObjectInspector();
if (ins != null)
f = ins.getTypeName();

PrimitiveObjectInspector ins2 = (PrimitiveObjectInspector) ins;
switch (ins2.getPrimitiveCategory()) {
case DOUBLE:

Double res = (Double)(((DoubleObjectInspector) ins2).get(obj));

result = (double) res;
System.out.println("printlog when double"+result);
return result;


case LONG:

Long res1 = (Long)(((LongObjectInspector) ins2).get(obj));
result = (long) res1;
System.out.println("printlog when long"+result);
return result;


case STRING:
result = (((StringObjectInspector)
ins2).getPrimitiveJavaObject(obj)).toString();
System.out.println("printlog when String"+result);
return result;

default:
result = obj;
return result;
}

}
After making This Changes .. The java hive udf started working fine on
Spark-sql

But it is giving different results when the UDF is used in the query..

If you think You can give it a shot solving this issue please reach me out
on hangouts or reply here


Re: does both below code do the same thing? I had to refactor code to fit in spark-sql

2017-01-31 Thread Alex
Guys! Please Reply

On Tue, Jan 31, 2017 at 12:31 PM, Alex  wrote:

> public Object get(Object name) {
> int pos = getPos((String) name);
> if (pos < 0)
> return null;
> String f = "string";
> Object obj = list.get(pos);
> Object result = null;
> if (obj == null)
> return null;
> ObjectInspector ins = ((StructField) colnames.get(pos)).
> getFieldObjectInspector();
> if (ins != null)
> f = ins.getTypeName();
>
> PrimitiveObjectInspector ins2 =
> (PrimitiveObjectInspector) ins;
> switch (ins2.getPrimitiveCategory()) {
> case DOUBLE:
>
> Double res = (Double)(((DoubleObjectInspector)
> ins2).get(obj));
> result = (double) res;
> return result;
>
>
> case LONG:
>
> Long res1 = (Long)(((LongObjectInspector)
> ins2).get(obj));
> result = (long) res1;
> return result;
>
>
> case STRING:
> result = (((StringObjectInspector)
> ins2).getPrimitiveJavaObject(obj)).toString();
> return result;
>
> default:
> result = obj;
> return result;
> }
>
> }
>
>
>
>
> Code 2 )
>
>
> public Object get(Object name) {
>   int pos = getPos((String)name);
>  if(pos<0) return null;
>  String f = "string";
>   Object obj= list.get(pos);
>  if(obj==null) return null;
>  ObjectInspector ins = ((StructField)colnames.get(
> pos)).getFieldObjectInspector();
>  if(ins!=null) f = ins.getTypeName();
>  switch (f) {
>case "double" :  return ((DoubleWritable)obj).get();
> case "bigint" :  return ((LongWritable)obj).get();
> case "string" :  return ((Text)obj).toString();
>default  :  return obj;
>  }
> }
>
>
> But getting different results in hive and spark
>


Parameterized types and Datasets - Spark 2.1.0

2017-01-31 Thread Don Drake
I have a set of CSV that I need to perform ETL on, with the plan to re-use
a lot of code between each file in a parent abstract class.

I tried creating the following simple abstract class that will have a
parameterized type of a case class that represents the schema being read in.

This won't compile, it just complains about not being able to find an
encoder, but I'm importing the implicits and don't believe this error.


scala> import spark.implicits._
import spark.implicits._

scala>

scala> case class RawTemp(f1: String, f2: String, temp: Long, created_at:
java.sql.Timestamp, data_filename: String)
defined class RawTemp

scala>

scala> abstract class RawTable[A](inDir: String) {
 | def load() = {
 | spark.read
 | .option("header", "true")
 | .option("mode", "FAILFAST")
 | .option("escape", "\"")
 | .option("nullValue", "")
 | .option("indferSchema", "true")
 | .csv(inDir)
 | .as[A]
 | }
 | }
:27: error: Unable to find encoder for type stored in a Dataset.
Primitive types (Int, String, etc) and Product types (case classes) are
supported by importing spark.implicits._  Support for serializing other
types will be added in future releases.
   .as[A]

scala> class TempTable extends RawTable[RawTemp]("/user/drake/t.csv")
:13: error: not found: type RawTable
   class TempTable extends RawTable[RawTemp]("/user/drake/t.csv")
  ^

What's odd is that this output looks okay:

scala> val RTEncoder = Encoders.product[RawTemp]
RTEncoder: org.apache.spark.sql.Encoder[RawTemp] = class[f1[0]: string,
f2[0]: string, temp[0]: bigint, created_at[0]: timestamp, data_filename[0]:
string]

scala> RTEncoder.schema
res4: org.apache.spark.sql.types.StructType =
StructType(StructField(f1,StringType,true),
StructField(f2,StringType,true), StructField(temp,LongType,false),
StructField(created_at,TimestampType,true),
StructField(data_filename,StringType,true))

scala> RTEncoder.clsTag
res5: scala.reflect.ClassTag[RawTemp] = RawTemp

Any ideas?

-- 
Donald Drake
Drake Consulting
http://www.drakeconsulting.com/
https://twitter.com/dondrake 
800-733-2143


Re: Converting timezones in Spark

2017-01-31 Thread Don Drake
So, to follow up on this.

A few lessons learned, when you print a timestamp, it will only show the
date/time in your current timezone, regardless of any conversions you
applied to it.

The trick is to convert it (cast) to a Long, and then the Java8 java.time.*
functions can translate to any timezone and generate a string representing
the timestamp.

Here's a working example:

scala> :paste
// Entering paste mode (ctrl-D to finish)

import java.time.Instant
import java.time.ZonedDateTime
import java.time.ZoneId
import java.time.format.DateTimeFormatter
import org.apache.spark.sql.functions.udf

def convertToTZ(col: Long, zone: String, formatter:
DateTimeFormatter):String = {

  val i = Instant.ofEpochSecond(col)
  val z = ZonedDateTime.ofInstant(i, ZoneId.of(zone))

  z.format(formatter)

}

def convertToTZFullTimestamp = udf((col: Long, zone:String) =>
convertToTZ(col, zone, DateTimeFormatter.ofPattern("-MM-dd HH:mm:ss
z")) )

val df = Seq((1L, "2016-09-14 16:46:32 UTC"), (2L, "not a timestamp"), (3L,
"2016-09-14 16:59:57 UTC"), (4L, "2016-11-30 12:00:01 UTC")).toDF("id",
"dts")

val df2 = df.withColumn("created_at", unix_timestamp($"dts", "-MM-dd
HH:mm:ss Z").cast("timestamp")).withColumn("EST_tz",
convertToTZFullTimestamp($"created_at".cast("long"),
lit("America/New_York")))

df2.show(4, false)


// Exiting paste mode, now interpreting.

+---+---+-+---+
|id |dts|created_at   |EST_tz |
+---+---+-+---+
|1  |2016-09-14 16:46:32 UTC|2016-09-14 11:46:32.0|2016-09-14 12:46:32 EDT|
|2  |not a timestamp|null |null   |
|3  |2016-09-14 16:59:57 UTC|2016-09-14 11:59:57.0|2016-09-14 12:59:57 EDT|
|4  |2016-11-30 12:00:01 UTC|2016-11-30 06:00:01.0|2016-11-30 07:00:01 EST|
+---+---+-+---+

import java.time.Instant
import java.time.ZonedDateTime
import java.time.ZoneId
import java.time.format.DateTimeFormatter
import org.apache.spark.sql.functions.udf
convertToTZ: (col: Long, zone: String, formatter:
java.time.format.DateTimeFormatter)String
convertToTZFullTimestamp:
org.apache.spark.sql.expressions.UserDefinedFunction
df: org.apache.spark.sql.DataFrame = [id: bigint, dts: string]
df2: org.apache.spark.sql.DataFrame = [id: bigint, dts: string ... 2 more
fields]

scala>



On Fri, Jan 27, 2017 at 12:01 PM, Don Drake  wrote:

> I'm reading CSV with a timestamp clearly identified in the UTC timezone,
> and I need to store this in a parquet format and eventually read it back
> and convert to different timezones as needed.
>
> Sounds straightforward, but this involves some crazy function calls and
> I'm seeing strange results as I build a test case.
>
> See my example below.  Why are the values for est_ts and cst_ts the same
> in rows 1 and 3 (wrong), but different and correct in row 4?  I have a
> feeling it has to do with daylight savings time, but I'm not sure where to
> resolve it.
>
> Please note that I'm in the Central timezone.
>
> Is there a better method to do this?
>
>
> Spark context available as 'sc' (master = local[*], app id =
> local-1485539128193).
>
> Spark session available as 'spark'.
>
> Welcome to
>
>     __
>
>  / __/__  ___ _/ /__
>
> _\ \/ _ \/ _ `/ __/  '_/
>
>/___/ .__/\_,_/_/ /_/\_\   version 2.1.0
>
>   /_/
>
>
> Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java
> 1.8.0_60)
>
> Type in expressions to have them evaluated.
>
> Type :help for more information.
>
>
> scala> :paste
>
> // Entering paste mode (ctrl-D to finish)
>
>
> import org.apache.spark.sql.Column
>
> def stringts_to_tz(col:Column, tz:String) = {
>
> from_utc_timestamp(to_utc_timestamp(from_unixtime(unix_timestamp(col,
> "-MM-dd HH:mm:ss Z")), "CST"), tz)
>
> }
>
>
> val df = Seq((1L, "2016-09-14 16:46:32 UTC"), (2L, "not a timestamp"),
> (3L, "2016-09-14 16:59:57 UTC"), (4L, "2016-11-31 12:00:01
> UTC")).toDF("id", "dts")
>
> val df2 = df.withColumn("created_at", unix_timestamp($"dts", "-MM-dd
> HH:mm:ss Z").cast("timestamp"))
>
> .withColumn("unix_ts", unix_timestamp($"dts", "-MM-dd HH:mm:ss Z"))
>
> .withColumn("local_hour", hour($"created_at"))
>
> .withColumn("s2", from_unixtime($"unix_ts"))
>
> .withColumn("s3", to_utc_timestamp($"s2", "CST"))
>
> .withColumn("s4", from_utc_timestamp($"s3", "EST"))
>
> .withColumn("utc_ts", stringts_to_tz($"dts", "UTC"))
>
> .withColumn("est_ts", stringts_to_tz($"dts", "CST"))
>
> .withColumn("cst_ts", stringts_to_tz($"dts", "EST"))
>
> df2.show(4,false)
>
> df2.printSchema
>
>
>
> // Exiting paste mode, now interpreting.
>
>
> +---+---+-+-
> -+--+---+-+-
> 

Re: Resource Leak in Spark Streaming

2017-01-31 Thread Nipun Arora
Just to be clear the pool object creation happens in the driver code, and
not in any anonymous function which should be executed in the executor.

On Tue, Jan 31, 2017 at 10:21 PM Nipun Arora 
wrote:

> Thanks for the suggestion Ryan, I will convert it to singleton and see if
> it solves the problem.
> If a code/object is created in the driver (in this case a connection
> object is passed from a pool of objects created in the driver) and is
> passed to executors or workers, why would a new object be created in each
> executor?
> Also would this new object be created every micro-batch?
>
> I'm sorry I might not understand what is going on properly so wanted to
> ask.
>
> Thanks
> Nipun
>
>
>
> On Tue, Jan 31, 2017 at 8:28 PM Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
> The KafkaProducerPool instance is created in the driver. Right? What's I
> was saying is when a Spark job runs, it will serialize KafkaProducerPool
> and create a new instance in the executor side.
>
> You can use the singleton pattern to make sure one JVM process has only
> one KafkaProducerPool instance.
>
> On Tue, Jan 31, 2017 at 3:32 PM, Nipun Arora 
> wrote:
>
> It's a producer pool, the borrow object takes an existing kafka producer
> object if it is free, or creates one if all are being used.
> Shouldn't we re-use kafka producer objects for writing to Kafka.
>
> @ryan- can you suggest a good solution for writing a dstream to kafka
> which can be used in production?
>
> I am attaching the Kafka producer pool class, where would one issue a call
> to close():
>
> public class KafkaProducerPool implements Serializable {
>
>private static final long serialVersionUID = -1913028296093224674L;
>
>private transient ConcurrentLinkedQueue> 
> pool;
>
>private ScheduledExecutorService executorService;
>
>private final Properties properties;
>
>private final int minIdle;
>
>/**
> * Creates the pool.
> *
> * @param minIdle
> *minimum number of objects residing in the pool
> */
>public KafkaProducerPool(final int minIdle, final Properties properties) {
>   // initialize pool
>   this.properties = properties;
>   this.minIdle = minIdle;
>   initialize();
>
>}
>
>/**
> * Creates the pool.
> *
> * @param minIdle
> *minimum number of objects residing in the pool
> * @param maxIdle
> *maximum number of objects residing in the pool
> * @param validationInterval
> *time in seconds for periodical checking of minIdle / maxIdle
> *conditions in a separate thread. When the number of objects 
> is
> *less than minIdle, missing instances will be created. When 
> the
> *number of objects is greater than maxIdle, too many instances
> *will be removed.
> */
>public KafkaProducerPool(final int minIdle, final int maxIdle,
>  final long validationInterval, final Properties properties) {
>   // initialize pool
>   this.properties = properties;
>   this.minIdle = minIdle;
>   initialize();
>
>   // check pool conditions in a separate thread
>   executorService = Executors.newSingleThreadScheduledExecutor();
>   executorService.scheduleWithFixedDelay(new Runnable() {
>  @Override
>  public void run() {
> int size = pool.size();
> if (size < minIdle) {
>int sizeToBeAdded = minIdle - size;
>for (int i = 0; i < sizeToBeAdded; i++) {
>   pool.add(createProducer());
>}
> } else if (size > maxIdle) {
>int sizeToBeRemoved = size - maxIdle;
>for (int i = 0; i < sizeToBeRemoved; i++) {
>   pool.poll();
>}
> }
>  }
>   }, validationInterval, validationInterval, TimeUnit.SECONDS);
>}
>
>/**
> * Gets the next free object from the pool. If the pool doesn't contain any
> * objects, a new object will be created and given to the caller of this
> * method back.
> *
> * @return T borrowed object
> */
>public synchronized KafkaProducer borrowProducer() {
>   if (pool == null)
>  initialize();
>   KafkaProducer object;
>   if ((object = pool.poll()) == null) {
>  object = createProducer();
>   }
>
>   return object;
>}
>
>/**
> * Returns object back to the pool.
> *
> *object to be returned
> */
>public void returnProducer(KafkaProducer producer) {
>   if (producer == null) {
>  return;
>   }
>   this.pool.offer(producer);
>}
>
>/**
> * Shutdown this pool.
> */
>public void shutdown() {
>   if (executorService != null) {
>  

Re: JavaRDD text matadata(file name) findings

2017-01-31 Thread Hyukjin Kwon
Hi,

Are you maybe possible to switch it to text datasource with input_file_name
function?

Thanks.

On 1 Feb 2017 3:58 a.m., "Manohar753" 
wrote:

Hi All,
myspark job is reading data from a folder having different files with same
structured data.
the red JavaRdd processed line by line but is there any way to know from
which file the line of data came.
Team thank you in advance for your reply coming.

Thanks,



--
View this message in context: http://apache-spark-user-list.
1001560.n3.nabble.com/JavaRDD-text-matadata-file-name-findings-tp28353.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Re: Resource Leak in Spark Streaming

2017-01-31 Thread Shixiong(Ryan) Zhu
The KafkaProducerPool instance is created in the driver. Right? What's I
was saying is when a Spark job runs, it will serialize KafkaProducerPool
and create a new instance in the executor side.

You can use the singleton pattern to make sure one JVM process has only one
KafkaProducerPool instance.

On Tue, Jan 31, 2017 at 3:32 PM, Nipun Arora 
wrote:

> It's a producer pool, the borrow object takes an existing kafka producer
> object if it is free, or creates one if all are being used.
> Shouldn't we re-use kafka producer objects for writing to Kafka.
>
> @ryan- can you suggest a good solution for writing a dstream to kafka
> which can be used in production?
>
> I am attaching the Kafka producer pool class, where would one issue a call
> to close():
>
> public class KafkaProducerPool implements Serializable {
>
>private static final long serialVersionUID = -1913028296093224674L;
>
>private transient ConcurrentLinkedQueue> 
> pool;
>
>private ScheduledExecutorService executorService;
>
>private final Properties properties;
>
>private final int minIdle;
>
>/**
> * Creates the pool.
> *
> * @param minIdle
> *minimum number of objects residing in the pool
> */
>public KafkaProducerPool(final int minIdle, final Properties properties) {
>   // initialize pool
>   this.properties = properties;
>   this.minIdle = minIdle;
>   initialize();
>
>}
>
>/**
> * Creates the pool.
> *
> * @param minIdle
> *minimum number of objects residing in the pool
> * @param maxIdle
> *maximum number of objects residing in the pool
> * @param validationInterval
> *time in seconds for periodical checking of minIdle / maxIdle
> *conditions in a separate thread. When the number of objects 
> is
> *less than minIdle, missing instances will be created. When 
> the
> *number of objects is greater than maxIdle, too many instances
> *will be removed.
> */
>public KafkaProducerPool(final int minIdle, final int maxIdle,
>  final long validationInterval, final Properties properties) {
>   // initialize pool
>   this.properties = properties;
>   this.minIdle = minIdle;
>   initialize();
>
>   // check pool conditions in a separate thread
>   executorService = Executors.newSingleThreadScheduledExecutor();
>   executorService.scheduleWithFixedDelay(new Runnable() {
>  @Override
>  public void run() {
> int size = pool.size();
> if (size < minIdle) {
>int sizeToBeAdded = minIdle - size;
>for (int i = 0; i < sizeToBeAdded; i++) {
>   pool.add(createProducer());
>}
> } else if (size > maxIdle) {
>int sizeToBeRemoved = size - maxIdle;
>for (int i = 0; i < sizeToBeRemoved; i++) {
>   pool.poll();
>}
> }
>  }
>   }, validationInterval, validationInterval, TimeUnit.SECONDS);
>}
>
>/**
> * Gets the next free object from the pool. If the pool doesn't contain any
> * objects, a new object will be created and given to the caller of this
> * method back.
> *
> * @return T borrowed object
> */
>public synchronized KafkaProducer borrowProducer() {
>   if (pool == null)
>  initialize();
>   KafkaProducer object;
>   if ((object = pool.poll()) == null) {
>  object = createProducer();
>   }
>
>   return object;
>}
>
>/**
> * Returns object back to the pool.
> *
> *object to be returned
> */
>public void returnProducer(KafkaProducer producer) {
>   if (producer == null) {
>  return;
>   }
>   this.pool.offer(producer);
>}
>
>/**
> * Shutdown this pool.
> */
>public void shutdown() {
>   if (executorService != null) {
>  KafkaProducer producer;
>  while ((producer = pool.poll()) != null) {
> producer.close();
>  }
>  executorService.shutdown();
>   }
>}
>
>/**
> * Creates a new producer.
> *
> * @return T new object
> */
>private KafkaProducer createProducer() {
>   KafkaProducer producer = new 
> KafkaProducer(properties);
>   return producer;
>}
>
>private void initialize() {
>   pool = new ConcurrentLinkedQueue>();
>
>   for (int i = 0; i < minIdle; i++) {
>  pool.add(createProducer());
>   }
>}
>
>public void closeAll() {
>   KafkaProducer object;
>   while ((object = pool.poll()) != null) {
>  

Re: Dataset Question: No Encoder found for Set[(scala.Long, scala.Long)]

2017-01-31 Thread Koert Kuipers
set is currently not supported. you can use kryo encoder. there is no other
work around that i know of.

import org.apache.spark.sql.{ Encoder, Encoders }
implicit def setEncoder[X]: Encoder[Set[X]] = Encoders.kryo[Set[X]]

On Tue, Jan 31, 2017 at 7:33 PM, Jerry Lam  wrote:

> Hi guys,
>
> I got an exception like the following, when I tried to implement a user
> defined aggregation function.
>
>  Exception in thread "main" java.lang.UnsupportedOperationException: No
> Encoder found for Set[(scala.Long, scala.Long)]
>
> The Set[(Long, Long)] is a field in the case class which is the output
> type for the aggregation.
>
> Is there a workaround for this?
>
> Best Regards,
>
> Jerry
>


Dataset Question: No Encoder found for Set[(scala.Long, scala.Long)]

2017-01-31 Thread Jerry Lam
Hi guys,

I got an exception like the following, when I tried to implement a user
defined aggregation function.

 Exception in thread "main" java.lang.UnsupportedOperationException: No
Encoder found for Set[(scala.Long, scala.Long)]

The Set[(Long, Long)] is a field in the case class which is the output type
for the aggregation.

Is there a workaround for this?

Best Regards,

Jerry


Re: eager? in dataframe's checkpoint

2017-01-31 Thread Koert Kuipers
i thought RDD.checkpoint is async? checkpointData is indeed updated
synchronously, but checkpointData.isCheckpointed is false until the actual
checkpoint operation has completed. and until the actual checkpoint
operation is done any operation will be on the original rdd.

for example notice how below it prints "not yet materialized" 6 times,
instead of just 3 times if the count had operated on the checkpoint data.

scala> val x = sc.parallelize(1 to 3).map{ (i) => println("not yet
materialized"); i }
x: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at map at
:24

scala> x.checkpoint(); println("is checkpointed? " + x.isCheckpointed);
println("count " + x.count)
is checkpointed? false
not yet materialized
not yet materialized
not yet materialized
not yet materialized
not yet materialized
not yet materialized
count 3






On Tue, Jan 31, 2017 at 4:18 PM, Burak Yavuz  wrote:

> Hi Koert,
>
> When eager is true, we return you a new DataFrame that depends on the
> files written out to the checkpoint directory.
> All previous operations on the checkpointed DataFrame are gone forever.
> You basically start fresh. AFAIK, when eager is true, the method will not
> return until the DataFrame is completely checkpointed. If you look at the
> RDD.checkpoint implementation, the checkpoint location is updated
> synchronously therefore during the count, `isCheckpointed` will be true.
>
> Best,
> Burak
>
> On Tue, Jan 31, 2017 at 12:52 PM, Koert Kuipers  wrote:
>
>> i understand that checkpoint cuts the lineage, but i am not fully sure i
>> understand the role of eager.
>>
>> eager simply seems to materialize the rdd early with a count, right after
>> the rdd has been checkpointed. but why is that useful? rdd.checkpoint is
>> asynchronous, so when the rdd.count happens most likely rdd.isCheckpointed
>> will be false, and the count will be on the rdd before it was checkpointed.
>> what is the benefit of that?
>>
>>
>> On Thu, Jan 26, 2017 at 11:19 PM, Burak Yavuz  wrote:
>>
>>> Hi,
>>>
>>> One of the goals of checkpointing is to cut the RDD lineage. Otherwise
>>> you run into StackOverflowExceptions. If you eagerly checkpoint, you
>>> basically cut the lineage there, and the next operations all depend on the
>>> checkpointed DataFrame. If you don't checkpoint, you continue to build the
>>> lineage, therefore while that lineage is being resolved, you may hit the
>>> StackOverflowException.
>>>
>>> HTH,
>>> Burak
>>>
>>> On Thu, Jan 26, 2017 at 10:36 AM, Jean Georges Perrin 
>>> wrote:
>>>
 Hey Sparkers,

 Trying to understand the Dataframe's checkpoint (*not* in the context
 of streaming) https://spark.apache.org/docs/latest/api/java/org
 /apache/spark/sql/Dataset.html#checkpoint(boolean)

 What is the goal of the *eager* flag?

 Thanks!

 jg

>>>
>>>
>>
>


Re: Resource Leak in Spark Streaming

2017-01-31 Thread Nipun Arora
It's a producer pool, the borrow object takes an existing kafka producer
object if it is free, or creates one if all are being used.
Shouldn't we re-use kafka producer objects for writing to Kafka.

@ryan- can you suggest a good solution for writing a dstream to kafka which
can be used in production?

I am attaching the Kafka producer pool class, where would one issue a call
to close():

public class KafkaProducerPool implements Serializable {

   private static final long serialVersionUID = -1913028296093224674L;

   private transient ConcurrentLinkedQueue> pool;

   private ScheduledExecutorService executorService;

   private final Properties properties;

   private final int minIdle;

   /**
* Creates the pool.
*
* @param minIdle
*minimum number of objects residing in the pool
*/
   public KafkaProducerPool(final int minIdle, final Properties properties) {
  // initialize pool
  this.properties = properties;
  this.minIdle = minIdle;
  initialize();

   }

   /**
* Creates the pool.
*
* @param minIdle
*minimum number of objects residing in the pool
* @param maxIdle
*maximum number of objects residing in the pool
* @param validationInterval
*time in seconds for periodical checking of minIdle / maxIdle
*conditions in a separate thread. When the number of objects is
*less than minIdle, missing instances will be created. When the
*number of objects is greater than maxIdle, too many instances
*will be removed.
*/
   public KafkaProducerPool(final int minIdle, final int maxIdle,
 final long validationInterval, final Properties properties) {
  // initialize pool
  this.properties = properties;
  this.minIdle = minIdle;
  initialize();

  // check pool conditions in a separate thread
  executorService = Executors.newSingleThreadScheduledExecutor();
  executorService.scheduleWithFixedDelay(new Runnable() {
 @Override
 public void run() {
int size = pool.size();
if (size < minIdle) {
   int sizeToBeAdded = minIdle - size;
   for (int i = 0; i < sizeToBeAdded; i++) {
  pool.add(createProducer());
   }
} else if (size > maxIdle) {
   int sizeToBeRemoved = size - maxIdle;
   for (int i = 0; i < sizeToBeRemoved; i++) {
  pool.poll();
   }
}
 }
  }, validationInterval, validationInterval, TimeUnit.SECONDS);
   }

   /**
* Gets the next free object from the pool. If the pool doesn't contain any
* objects, a new object will be created and given to the caller of this
* method back.
*
* @return T borrowed object
*/
   public synchronized KafkaProducer borrowProducer() {
  if (pool == null)
 initialize();
  KafkaProducer object;
  if ((object = pool.poll()) == null) {
 object = createProducer();
  }

  return object;
   }

   /**
* Returns object back to the pool.
*
*object to be returned
*/
   public void returnProducer(KafkaProducer producer) {
  if (producer == null) {
 return;
  }
  this.pool.offer(producer);
   }

   /**
* Shutdown this pool.
*/
   public void shutdown() {
  if (executorService != null) {
 KafkaProducer producer;
 while ((producer = pool.poll()) != null) {
producer.close();
 }
 executorService.shutdown();
  }
   }

   /**
* Creates a new producer.
*
* @return T new object
*/
   private KafkaProducer createProducer() {
  KafkaProducer producer = new
KafkaProducer(properties);
  return producer;
   }

   private void initialize() {
  pool = new ConcurrentLinkedQueue>();

  for (int i = 0; i < minIdle; i++) {
 pool.add(createProducer());
  }
   }

   public void closeAll() {
  KafkaProducer object;
  while ((object = pool.poll()) != null) {
 //object.flush();
 object.close();
  }
   }
}

Thanks
Nipun

On Tue, Jan 31, 2017 at 6:09 PM Shixiong(Ryan) Zhu 
wrote:

> Looks like you create KafkaProducerPool in the driver. So when the task is
> running in the executor, it will always see an new empty KafkaProducerPool
> and create KafkaProducers. But nobody closes these KafkaProducers.
>
> On Tue, Jan 31, 2017 at 3:02 PM, Nipun Arora 
> wrote:
>
>
> Sorry for not writing the patch number, it's spark 1.6.1.
> The relevant code is here inline.
>
> Please have a look and let me know if there is a resource leak.
> Please also let me know if you need 

Re: Resource Leak in Spark Streaming

2017-01-31 Thread Shixiong(Ryan) Zhu
Looks like you create KafkaProducerPool in the driver. So when the task is
running in the executor, it will always see an new empty KafkaProducerPool
and create KafkaProducers. But nobody closes these KafkaProducers.

On Tue, Jan 31, 2017 at 3:02 PM, Nipun Arora 
wrote:

>
> Sorry for not writing the patch number, it's spark 1.6.1.
> The relevant code is here inline.
>
> Please have a look and let me know if there is a resource leak.
> Please also let me know if you need any more details.
>
> Thanks
> Nipun
>
>
> The JavaRDDKafkaWriter code is here inline:
>
> import org.apache.spark.api.java.JavaRDD;
> import org.apache.spark.api.java.function.VoidFunction;
> import scala.Tuple2;
>
> import java.io.Serializable;
> import java.util.Iterator;
>
> public class JavaRDDStringKafkaWriter implements Serializable, 
> VoidFunction {
>
>private static final long serialVersionUID = -865193912367180261L;
>private final KafkaProducerPool pool;
>private final String topic;
>private final Boolean kafkaAsync;
>
>public JavaRDDStringKafkaWriter(final KafkaProducerPool pool, String 
> topic, Boolean kafkaAsync) {
>   this.pool = pool;
>   this.topic = topic;
>   this.kafkaAsync = kafkaAsync;
>}
>
>@Override
>public void call(JavaRDD stringJavaRDD) throws Exception {
>   stringJavaRDD.foreachPartition(new PartitionVoidFunction(
> new RDDKafkaWriter(pool,kafkaAsync), topic));
>}
>
>private class PartitionVoidFunction implements
>  VoidFunction {
>
>   private static final long serialVersionUID = 8726871215617446598L;
>   private final RDDKafkaWriter kafkaWriter;
>   private final String topic;
>
>   public PartitionVoidFunction(RDDKafkaWriter kafkaWriter, String topic) {
>  this.kafkaWriter = kafkaWriter;
>  this.topic = topic;
>   }
>
>   @Override
>   public void call(Iterator iterator) throws Exception {
>  while (iterator.hasNext()) {
> kafkaWriter.writeToKafka(topic, iterator.next());
>  }
>   }
>}
> }
>
>
> The RDDKafkaWriter is here:
>
>
> import java.io.Serializable;
> import java.util.concurrent.ExecutionException;
>
> import org.apache.kafka.clients.producer.KafkaProducer;
> import org.apache.kafka.clients.producer.ProducerRecord;
>
> import scala.Tuple2;
>
> public class RDDKafkaWriter implements Serializable {
>
>private static final long serialVersionUID = 7374381310562055607L;
>private final KafkaProducerPool pool;
>private final Boolean async;
>
>public RDDKafkaWriter(final KafkaProducerPool pool, Boolean async) {
>   this.pool = pool;
>   this.async = async;
>
>}
>
>public void writeToKafka(String topic, Tuple2 message) {
>   KafkaProducer producer = pool.borrowProducer();
>   ProducerRecord record = new ProducerRecord String>(
> topic, message._1(), message._2());
>   if (async) {
>  producer.send(record);
>   } else {
>  try {
> producer.send(record).get();
>  } catch (Exception e) {
> e.printStackTrace();
>  }
>   }
>   pool.returnProducer(producer);
>}
>
> public void writeToKafka(String topic, String message) {
>
> KafkaProducer producer = pool.borrowProducer();
> ProducerRecord record = new ProducerRecord String>(topic, message);
>
> if (async) {
> producer.send(record);
> } else {
> try {
> producer.send(record).get();
> } catch (Exception e) {
> e.printStackTrace();
> }
> }
> pool.returnProducer(producer);
> }
>
>
> }
>
>
>
>
>
> On Tue, Jan 31, 2017 at 5:20 PM Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> Please also include the patch version, such as 1.6.0, 1.6.1. Could you
>> also post the JAVARDDKafkaWriter codes. It's also possible that it leaks
>> resources.
>>
>> On Tue, Jan 31, 2017 at 2:12 PM, Nipun Arora 
>> wrote:
>>
>> It is spark 1.6
>>
>> Thanks
>> Nipun
>>
>> On Tue, Jan 31, 2017 at 1:45 PM Shixiong(Ryan) Zhu <
>> shixi...@databricks.com> wrote:
>>
>> Could you provide your Spark version please?
>>
>> On Tue, Jan 31, 2017 at 10:37 AM, Nipun Arora 
>> wrote:
>>
>> Hi,
>>
>> I get a resource leak, where the number of file descriptors in spark
>> streaming keeps increasing. We end up with a "too many file open" error
>> eventually through an exception caused in:
>>
>> JAVARDDKafkaWriter, which is writing a spark JavaDStream
>>
>> The exception is attached inline. Any help will be greatly appreciated.
>>
>> Thanks
>> Nipun
>>
>> ---
>> Time: 148576253 ms
>> ---
>>
>> Exception in thread 

Re: Resource Leak in Spark Streaming

2017-01-31 Thread Nipun Arora
Sorry for not writing the patch number, it's spark 1.6.1.
The relevant code is here inline.

Please have a look and let me know if there is a resource leak.
Please also let me know if you need any more details.

Thanks
Nipun


The JavaRDDKafkaWriter code is here inline:

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;

import java.io.Serializable;
import java.util.Iterator;

public class JavaRDDStringKafkaWriter implements Serializable,
VoidFunction {

   private static final long serialVersionUID = -865193912367180261L;
   private final KafkaProducerPool pool;
   private final String topic;
   private final Boolean kafkaAsync;

   public JavaRDDStringKafkaWriter(final KafkaProducerPool pool,
String topic, Boolean kafkaAsync) {
  this.pool = pool;
  this.topic = topic;
  this.kafkaAsync = kafkaAsync;
   }

   @Override
   public void call(JavaRDD stringJavaRDD) throws Exception {
  stringJavaRDD.foreachPartition(new PartitionVoidFunction(
new RDDKafkaWriter(pool,kafkaAsync), topic));
   }

   private class PartitionVoidFunction implements
 VoidFunction {

  private static final long serialVersionUID = 8726871215617446598L;
  private final RDDKafkaWriter kafkaWriter;
  private final String topic;

  public PartitionVoidFunction(RDDKafkaWriter kafkaWriter, String topic) {
 this.kafkaWriter = kafkaWriter;
 this.topic = topic;
  }

  @Override
  public void call(Iterator iterator) throws Exception {
 while (iterator.hasNext()) {
kafkaWriter.writeToKafka(topic, iterator.next());
 }
  }
   }
}


The RDDKafkaWriter is here:


import java.io.Serializable;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import scala.Tuple2;

public class RDDKafkaWriter implements Serializable {

   private static final long serialVersionUID = 7374381310562055607L;
   private final KafkaProducerPool pool;
   private final Boolean async;

   public RDDKafkaWriter(final KafkaProducerPool pool, Boolean async) {
  this.pool = pool;
  this.async = async;

   }

   public void writeToKafka(String topic, Tuple2 message) {
  KafkaProducer producer = pool.borrowProducer();
  ProducerRecord record = new
ProducerRecord(
topic, message._1(), message._2());
  if (async) {
 producer.send(record);
  } else {
 try {
producer.send(record).get();
 } catch (Exception e) {
e.printStackTrace();
 }
  }
  pool.returnProducer(producer);
   }

public void writeToKafka(String topic, String message) {

KafkaProducer producer = pool.borrowProducer();
ProducerRecord record = new
ProducerRecord(topic, message);

if (async) {
producer.send(record);
} else {
try {
producer.send(record).get();
} catch (Exception e) {
e.printStackTrace();
}
}
pool.returnProducer(producer);
}


}





On Tue, Jan 31, 2017 at 5:20 PM Shixiong(Ryan) Zhu 
wrote:

> Please also include the patch version, such as 1.6.0, 1.6.1. Could you
> also post the JAVARDDKafkaWriter codes. It's also possible that it leaks
> resources.
>
> On Tue, Jan 31, 2017 at 2:12 PM, Nipun Arora 
> wrote:
>
> It is spark 1.6
>
> Thanks
> Nipun
>
> On Tue, Jan 31, 2017 at 1:45 PM Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
> Could you provide your Spark version please?
>
> On Tue, Jan 31, 2017 at 10:37 AM, Nipun Arora 
> wrote:
>
> Hi,
>
> I get a resource leak, where the number of file descriptors in spark
> streaming keeps increasing. We end up with a "too many file open" error
> eventually through an exception caused in:
>
> JAVARDDKafkaWriter, which is writing a spark JavaDStream
>
> The exception is attached inline. Any help will be greatly appreciated.
>
> Thanks
> Nipun
>
> ---
> Time: 148576253 ms
> ---
>
> Exception in thread "main" org.apache.spark.SparkException: Job aborted
> due to stage failure: Task 0 in stage 85968.0 failed 1 times, most recent
> failure: Lost task 0.0 in stage 85968.0 (TID 29562, localhost):
> java.io.FileNotFoundException:
> /tmp/blockmgr-1b3ddc44-f9a4-42cd-977c-532cb962d7d3/3e/shuffle_10625_0_0.data.4651a131-6072-460b-b150-2b3080902084
> (too many open files)
> at java.io.FileOutputStream.open(Native Method)
> at java.io.FileOutputStream.(FileOutputStream.java:221)
> at
> org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:88)

Re: Kafka dependencies in Eclipse project /Pls assist

2017-01-31 Thread Cody Koeninger
spark-streaming-kafka-0-10 has a transitive dependency on the kafka
library, you shouldn't need to include kafka explicitly.

What's your actual list of dependencies?

On Tue, Jan 31, 2017 at 3:49 PM, Marco Mistroni  wrote:
> HI all
>   i am trying to run a sample spark code which reads streaming data from
> Kafka
> I Have followed instructions here
>
> https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
>
>
> Here's my setup
> Spark: 2.0.1
> Kafka:0.10.1.1
> Scala Version: 2.11
>
>
>
> Libraries used
> - spark-streaming-kafka-0.10_2.11-2.0.1
> - kafka-_2.11-0.10.0.1.jar
>
> These are my imports
>
> import org.apache.spark.streaming.kafka010.KafkaUtils
> import
> org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
> import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
>
> But Eclipse is giving me the following errors:
> Missing or invlaid dependency detected while loading class file
> KafkaUtils.class. Could not access term clients in value org.apache.kafka
> because it (or its dependencies) are missing.
> Missing or invalid dependency detected while loading class file
> KafkaUtils.class Could not access term kafka in package org.apache because
> it (or its dependencies are missing)
> missing or invalid dependencies detected while loading class file
> KafkaUtils.class: could not access type ConsumerRecord in value
> org.apache.consumer because it(or its dependencies are missing)
>
> So it seems i have some dependencies clashing. Has any one encountered a
> similar error?
>
> kr
>  marco

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Resource Leak in Spark Streaming

2017-01-31 Thread Shixiong(Ryan) Zhu
Please also include the patch version, such as 1.6.0, 1.6.1. Could you also
post the JAVARDDKafkaWriter codes. It's also possible that it leaks
resources.

On Tue, Jan 31, 2017 at 2:12 PM, Nipun Arora 
wrote:

> It is spark 1.6
>
> Thanks
> Nipun
>
> On Tue, Jan 31, 2017 at 1:45 PM Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> Could you provide your Spark version please?
>>
>> On Tue, Jan 31, 2017 at 10:37 AM, Nipun Arora 
>> wrote:
>>
>> Hi,
>>
>> I get a resource leak, where the number of file descriptors in spark
>> streaming keeps increasing. We end up with a "too many file open" error
>> eventually through an exception caused in:
>>
>> JAVARDDKafkaWriter, which is writing a spark JavaDStream
>>
>> The exception is attached inline. Any help will be greatly appreciated.
>>
>> Thanks
>> Nipun
>>
>> ---
>> Time: 148576253 ms
>> ---
>>
>> Exception in thread "main" org.apache.spark.SparkException: Job aborted
>> due to stage failure: Task 0 in stage 85968.0 failed 1 times, most recent
>> failure: Lost task 0.0 in stage 85968.0 (TID 29562, localhost):
>> java.io.FileNotFoundException: /tmp/blockmgr-1b3ddc44-f9a4-
>> 42cd-977c-532cb962d7d3/3e/shuffle_10625_0_0.data.4651a131-6072-460b-b150-2b3080902084
>> (too many open files)
>> at java.io.FileOutputStream.open(Native Method)
>> at java.io.FileOutputStream.(FileOutputStream.java:221)
>> at org.apache.spark.storage.DiskBlockObjectWriter.open(
>> DiskBlockObjectWriter.scala:88)
>> at org.apache.spark.storage.DiskBlockObjectWriter.write(
>> DiskBlockObjectWriter.scala:181)
>> at org.apache.spark.util.collection.WritablePartitionedPairCollect
>> ion$$anon$1.writeNext(WritablePartitionedPairCollection.scala:56)
>> at org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(
>> ExternalSorter.scala:659)
>> at org.apache.spark.shuffle.sort.SortShuffleWriter.write(
>> SortShuffleWriter.scala:72)
>> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
>> ShuffleMapTask.scala:73)
>> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
>> ShuffleMapTask.scala:41)
>> at org.apache.spark.scheduler.Task.run(Task.scala:89)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(
>> ThreadPoolExecutor.java:1145)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>> ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> Driver stacktrace:
>> at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$
>> scheduler$DAGScheduler$$failJobAndIndependentStages(
>> DAGScheduler.scala:1431)
>> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(
>> DAGScheduler.scala:1419)
>> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(
>> DAGScheduler.scala:1418)
>> at scala.collection.mutable.ResizableArray$class.foreach(
>> ResizableArray.scala:59)
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at org.apache.spark.scheduler.DAGScheduler.abortStage(
>> DAGScheduler.scala:1418)
>> at org.apache.spark.scheduler.DAGScheduler$$anonfun$
>> handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
>> at org.apache.spark.scheduler.DAGScheduler$$anonfun$
>> handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
>> at scala.Option.foreach(Option.scala:236)
>> at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(
>> DAGScheduler.scala:799)
>> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
>> doOnReceive(DAGScheduler.scala:1640)
>> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
>> onReceive(DAGScheduler.scala:1599)
>> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
>> onReceive(DAGScheduler.scala:1588)
>> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1857)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1870)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1883)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1954)
>> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.
>> apply(RDD.scala:920)
>> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.
>> apply(RDD.scala:918)
>> at org.apache.spark.rdd.RDDOperationScope$.withScope(
>> RDDOperationScope.scala:150)
>> at org.apache.spark.rdd.RDDOperationScope$.withScope(
>> RDDOperationScope.scala:111)
>> at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
>> at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918)
>> at org.apache.spark.api.java.JavaRDDLike$class.
>> foreachPartition(JavaRDDLike.scala:225)
>> at org.apache.spark.api.java.AbstractJavaRDDLike.
>> foreachPartition(JavaRDDLike.scala:46)
>> at org.necla.ngla.kafka.JavaRDDStringKafkaWriter.call(
>> 

Re: Resource Leak in Spark Streaming

2017-01-31 Thread Nipun Arora
It is spark 1.6

Thanks
Nipun

On Tue, Jan 31, 2017 at 1:45 PM Shixiong(Ryan) Zhu 
wrote:

> Could you provide your Spark version please?
>
> On Tue, Jan 31, 2017 at 10:37 AM, Nipun Arora 
> wrote:
>
> Hi,
>
> I get a resource leak, where the number of file descriptors in spark
> streaming keeps increasing. We end up with a "too many file open" error
> eventually through an exception caused in:
>
> JAVARDDKafkaWriter, which is writing a spark JavaDStream
>
> The exception is attached inline. Any help will be greatly appreciated.
>
> Thanks
> Nipun
>
> ---
> Time: 148576253 ms
> ---
>
> Exception in thread "main" org.apache.spark.SparkException: Job aborted
> due to stage failure: Task 0 in stage 85968.0 failed 1 times, most recent
> failure: Lost task 0.0 in stage 85968.0 (TID 29562, localhost):
> java.io.FileNotFoundException:
> /tmp/blockmgr-1b3ddc44-f9a4-42cd-977c-532cb962d7d3/3e/shuffle_10625_0_0.data.4651a131-6072-460b-b150-2b3080902084
> (too many open files)
> at java.io.FileOutputStream.open(Native Method)
> at java.io.FileOutputStream.(FileOutputStream.java:221)
> at
> org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:88)
> at
> org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:181)
> at
> org.apache.spark.util.collection.WritablePartitionedPairCollection$$anon$1.writeNext(WritablePartitionedPairCollection.scala:56)
> at
> org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:659)
> at
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:72)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
> Driver stacktrace:
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
> at scala.Option.foreach(Option.scala:236)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1857)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1870)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1883)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1954)
> at
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:920)
> at
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:918)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
> at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918)
> at
> org.apache.spark.api.java.JavaRDDLike$class.foreachPartition(JavaRDDLike.scala:225)
> at
> org.apache.spark.api.java.AbstractJavaRDDLike.foreachPartition(JavaRDDLike.scala:46)
> at
> org.necla.ngla.kafka.JavaRDDStringKafkaWriter.call(JavaRDDStringKafkaWriter.java:25)
> at
> org.necla.ngla.kafka.JavaRDDStringKafkaWriter.call(JavaRDDStringKafkaWriter.java:10)
> at
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3.apply(JavaDStreamLike.scala:335)
> at
> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3.apply(JavaDStreamLike.scala:335)
> at
> 

Re: HBase Spark

2017-01-31 Thread Benjamin Kim
Elek,

If I cannot use the HBase Spark module, then I’ll give it a try.

Thanks,
Ben


> On Jan 31, 2017, at 1:02 PM, Marton, Elek  wrote:
> 
> 
> I tested this one with hbase 1.2.4:
> 
> https://github.com/hortonworks-spark/shc
> 
> Marton
> 
> On 01/31/2017 09:17 PM, Benjamin Kim wrote:
>> Does anyone know how to backport the HBase Spark module to HBase 1.2.0? I 
>> tried to build it from source, but I cannot get it to work.
>> 
>> Thanks,
>> Ben
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> 
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Kafka dependencies in Eclipse project /Pls assist

2017-01-31 Thread Marco Mistroni
HI all
  i am trying to run a sample spark code which reads streaming data from
Kafka
I Have followed instructions here

https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html


Here's my setup
Spark: 2.0.1
Kafka:0.10.1.1
Scala Version: 2.11



Libraries used
- spark-streaming-kafka-0.10_2.11-2.0.1
- kafka-_2.11-0.10.0.1.jar

These are my imports

import org.apache.spark.streaming.kafka010.KafkaUtils
import
org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

But Eclipse is giving me the following errors:
Missing or invlaid dependency detected while loading class file
KafkaUtils.class. Could not access term clients in value org.apache.kafka
because it (or its dependencies) are missing.
Missing or invalid dependency detected while loading class file
KafkaUtils.class Could not access term kafka in package org.apache because
it (or its dependencies are missing)
missing or invalid dependencies detected while loading class file
KafkaUtils.class: could not access type ConsumerRecord in value
org.apache.consumer because it(or its dependencies are missing)

So it seems i have some dependencies clashing. Has any one encountered a
similar error?

kr
 marco


Re: Question about "Output Op Duration" in SparkStreaming Batch details UX

2017-01-31 Thread Shixiong(Ryan) Zhu
It means the total time to run a batch, including the Spark job duration +
time spent on the driver. E.g.,

foreachRDD { rdd =>
   rdd.count() // say this takes 1 second.
   Thread.sleep(1) // sleep 10 seconds
}

In the above example, the Spark job duration is 1 seconds and the output op
duration is 11 seconds.


On Tue, Jan 31, 2017 at 12:56 PM, satishl  wrote:

> For Spark Streaming Apps, what does "Output Op Duration" in the batch
> details
> UX signify?
> We have been observing that - for the given batch's last output Op id -
> Output Op duration > Job duration by a factor. Sometimes it is huge (1
> min).
> I have provided the screenshot below where - you can see that for Output Op
> Id 5, Job Duration is 59ms but output Op duration is 19s. There is no other
> info on where the extra 18.9 seconds were spent.
> On clicking the JobId - there is no breakdown on how this extra time is
> spent.
>
>
>  file/n28354/outputopduration.png>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Question-about-Output-Op-Duration-in-SparkStreaming-
> Batch-details-UX-tp28354.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


increasing cross join speed

2017-01-31 Thread Kürşat Kurt
Hi;

 

I have 2 dataframes. I am trying to cross join for finding vector distances.
Then i can choose the most similiar vectors.

Cross join speed is too slow. How can i increase the speed, or have you any
suggestion for this comparision?

 

 

val result=myDict.join(mainDataset).map(x=>{

 

   val orgClassName1 =x.getAs[SparseVector](1); 

   val orgClassName2 =x.getAs[SparseVector](2);

   val f1=x.getAs[SparseVector](3);

   val f2=x.getAs[SparseVector](4);

   val dist=Vectors.sqdist(f1,f2);

   

   (orgClassName1, orgClassName2,dist)

 }).toDF("orgClassName1","orgClassName2,"dist");

 

 

 



Re: eager? in dataframe's checkpoint

2017-01-31 Thread Burak Yavuz
Hi Koert,

When eager is true, we return you a new DataFrame that depends on the files
written out to the checkpoint directory.
All previous operations on the checkpointed DataFrame are gone forever. You
basically start fresh. AFAIK, when eager is true, the method will not
return until the DataFrame is completely checkpointed. If you look at the
RDD.checkpoint implementation, the checkpoint location is updated
synchronously therefore during the count, `isCheckpointed` will be true.

Best,
Burak

On Tue, Jan 31, 2017 at 12:52 PM, Koert Kuipers  wrote:

> i understand that checkpoint cuts the lineage, but i am not fully sure i
> understand the role of eager.
>
> eager simply seems to materialize the rdd early with a count, right after
> the rdd has been checkpointed. but why is that useful? rdd.checkpoint is
> asynchronous, so when the rdd.count happens most likely rdd.isCheckpointed
> will be false, and the count will be on the rdd before it was checkpointed.
> what is the benefit of that?
>
>
> On Thu, Jan 26, 2017 at 11:19 PM, Burak Yavuz  wrote:
>
>> Hi,
>>
>> One of the goals of checkpointing is to cut the RDD lineage. Otherwise
>> you run into StackOverflowExceptions. If you eagerly checkpoint, you
>> basically cut the lineage there, and the next operations all depend on the
>> checkpointed DataFrame. If you don't checkpoint, you continue to build the
>> lineage, therefore while that lineage is being resolved, you may hit the
>> StackOverflowException.
>>
>> HTH,
>> Burak
>>
>> On Thu, Jan 26, 2017 at 10:36 AM, Jean Georges Perrin 
>> wrote:
>>
>>> Hey Sparkers,
>>>
>>> Trying to understand the Dataframe's checkpoint (*not* in the context
>>> of streaming) https://spark.apache.org/docs/latest/api/java/org
>>> /apache/spark/sql/Dataset.html#checkpoint(boolean)
>>>
>>> What is the goal of the *eager* flag?
>>>
>>> Thanks!
>>>
>>> jg
>>>
>>
>>
>


Re: HBase Spark

2017-01-31 Thread Marton, Elek


I tested this one with hbase 1.2.4:

https://github.com/hortonworks-spark/shc

Marton

On 01/31/2017 09:17 PM, Benjamin Kim wrote:

Does anyone know how to backport the HBase Spark module to HBase 1.2.0? I tried 
to build it from source, but I cannot get it to work.

Thanks,
Ben
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Question about "Output Op Duration" in SparkStreaming Batch details UX

2017-01-31 Thread satishl
For Spark Streaming Apps, what does "Output Op Duration" in the batch details
UX signify?
We have been observing that - for the given batch's last output Op id -
Output Op duration > Job duration by a factor. Sometimes it is huge (1 min). 
I have provided the screenshot below where - you can see that for Output Op
Id 5, Job Duration is 59ms but output Op duration is 19s. There is no other
info on where the extra 18.9 seconds were spent. 
On clicking the JobId - there is no breakdown on how this extra time is 
spent.



 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Question-about-Output-Op-Duration-in-SparkStreaming-Batch-details-UX-tp28354.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: eager? in dataframe's checkpoint

2017-01-31 Thread Koert Kuipers
i understand that checkpoint cuts the lineage, but i am not fully sure i
understand the role of eager.

eager simply seems to materialize the rdd early with a count, right after
the rdd has been checkpointed. but why is that useful? rdd.checkpoint is
asynchronous, so when the rdd.count happens most likely rdd.isCheckpointed
will be false, and the count will be on the rdd before it was checkpointed.
what is the benefit of that?


On Thu, Jan 26, 2017 at 11:19 PM, Burak Yavuz  wrote:

> Hi,
>
> One of the goals of checkpointing is to cut the RDD lineage. Otherwise you
> run into StackOverflowExceptions. If you eagerly checkpoint, you basically
> cut the lineage there, and the next operations all depend on the
> checkpointed DataFrame. If you don't checkpoint, you continue to build the
> lineage, therefore while that lineage is being resolved, you may hit the
> StackOverflowException.
>
> HTH,
> Burak
>
> On Thu, Jan 26, 2017 at 10:36 AM, Jean Georges Perrin  wrote:
>
>> Hey Sparkers,
>>
>> Trying to understand the Dataframe's checkpoint (*not* in the context of
>> streaming) https://spark.apache.org/docs/latest/api/java/
>> org/apache/spark/sql/Dataset.html#checkpoint(boolean)
>>
>> What is the goal of the *eager* flag?
>>
>> Thanks!
>>
>> jg
>>
>
>


Re: Spark 2.1.0 and Shapeless

2017-01-31 Thread Koert Kuipers
shading at the fat jar level can work, however it means that in your unit
tests where spark is a provided dependency you still can get errors because
spark is using an incompatible (newer) shapeless version. the unit tests
run with a single resolved shapeless after all.

for example spark ships with old guava but works fine with never guava. so
in our unit tests spark uses newer guava without issues, and for fat jar we
shade guava in sbt-assembly. it all works ok in this situation for us.

On Tue, Jan 31, 2017 at 3:06 PM, Phil Wills  wrote:

> Are you not able to shade it when you're building your fat jar with
> something like https://github.com/sbt/sbt-assembly#shading? I would have
> thought doing the shading at the app level would be a bit less painful than
> doing it at the library level.
>
> Phil
>
> On Tue, 31 Jan 2017, 04:24 Timothy Chan,  wrote:
>
>> I'm using a library, https://github.com/guardian/scanamo, that uses
>> shapeless 2.3.2. What are my options if I want to use this with Spark
>> 2.1.0?
>>
>> Based on this: http://apache-spark-developers-list.1001551.n3.
>> nabble.com/shapeless-in-spark-2-1-0-tt20392.html
>>
>> I'm guessing I would have to release my own version of scanamo with a
>> shaded shapeless?
>>
>


HBase Spark

2017-01-31 Thread Benjamin Kim
Does anyone know how to backport the HBase Spark module to HBase 1.2.0? I tried 
to build it from source, but I cannot get it to work.

Thanks,
Ben
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark 2.1.0 and Shapeless

2017-01-31 Thread Phil Wills
Are you not able to shade it when you're building your fat jar with
something like https://github.com/sbt/sbt-assembly#shading? I would have
thought doing the shading at the app level would be a bit less painful than
doing it at the library level.

Phil

On Tue, 31 Jan 2017, 04:24 Timothy Chan,  wrote:

> I'm using a library, https://github.com/guardian/scanamo, that uses
> shapeless 2.3.2. What are my options if I want to use this with Spark
> 2.1.0?
>
> Based on this:
> http://apache-spark-developers-list.1001551.n3.nabble.com/shapeless-in-spark-2-1-0-tt20392.html
>
> I'm guessing I would have to release my own version of scanamo with a
> shaded shapeless?
>


Re: ML version of Kmeans

2017-01-31 Thread Hollin Wilkins
Hey,

You could also take a look at MLeap, which provides a runtime for any Spark
transformer and does not have any dependencies on a SparkContext or Spark
libraries (excepting MLlib-local for linear algebra).

https://github.com/combust/mleap

On Tue, Jan 31, 2017 at 2:33 AM, Aseem Bansal  wrote:

> If you want to predict using dataset then transform is the way to go. If
> you want to predict on vectors then you will have to wait on this issue to
> be completed https://issues.apache.org/jira/browse/SPARK-10413
>
> On Tue, Jan 31, 2017 at 3:01 PM, Holden Karau 
> wrote:
>
>> You most likely want the transform function on KMeansModel (although that
>> works on a dataset input rather than a single element at a time).
>>
>> On Tue, Jan 31, 2017 at 1:24 AM, Madabhattula Rajesh Kumar <
>> mrajaf...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I am not able to find predict method on "ML" version of Kmeans.
>>>
>>> Mllib version has a predict method.  KMeansModel.predict(point: Vector)
>>> .
>>> How to predict the cluster point for new vectors in ML version of kmeans
>>> ?
>>>
>>> Regards,
>>> Rajesh
>>>
>>
>>
>>
>> --
>> Cell : 425-233-8271 <(425)%20233-8271>
>> Twitter: https://twitter.com/holdenkarau
>>
>
>


JavaRDD text matadata(file name) findings

2017-01-31 Thread Manohar753
Hi All,
myspark job is reading data from a folder having different files with same
structured data.
the red JavaRdd processed line by line but is there any way to know from
which file the line of data came.
Team thank you in advance for your reply coming.

Thanks,



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/JavaRDD-text-matadata-file-name-findings-tp28353.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Resource Leak in Spark Streaming

2017-01-31 Thread Shixiong(Ryan) Zhu
Could you provide your Spark version please?

On Tue, Jan 31, 2017 at 10:37 AM, Nipun Arora 
wrote:

> Hi,
>
> I get a resource leak, where the number of file descriptors in spark
> streaming keeps increasing. We end up with a "too many file open" error
> eventually through an exception caused in:
>
> JAVARDDKafkaWriter, which is writing a spark JavaDStream
>
> The exception is attached inline. Any help will be greatly appreciated.
>
> Thanks
> Nipun
>
> ---
> Time: 148576253 ms
> ---
>
> Exception in thread "main" org.apache.spark.SparkException: Job aborted
> due to stage failure: Task 0 in stage 85968.0 failed 1 times, most recent
> failure: Lost task 0.0 in stage 85968.0 (TID 29562, localhost):
> java.io.FileNotFoundException: /tmp/blockmgr-1b3ddc44-f9a4-
> 42cd-977c-532cb962d7d3/3e/shuffle_10625_0_0.data.4651a131-6072-460b-b150-2b3080902084
> (too many open files)
> at java.io.FileOutputStream.open(Native Method)
> at java.io.FileOutputStream.(FileOutputStream.java:221)
> at org.apache.spark.storage.DiskBlockObjectWriter.open(
> DiskBlockObjectWriter.scala:88)
> at org.apache.spark.storage.DiskBlockObjectWriter.write(
> DiskBlockObjectWriter.scala:181)
> at org.apache.spark.util.collection.WritablePartitionedPairCollect
> ion$$anon$1.writeNext(WritablePartitionedPairCollection.scala:56)
> at org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(
> ExternalSorter.scala:659)
> at org.apache.spark.shuffle.sort.SortShuffleWriter.write(
> SortShuffleWriter.scala:72)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:73)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1145)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
> Driver stacktrace:
> at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$
> scheduler$DAGScheduler$$failJobAndIndependentStages(
> DAGScheduler.scala:1431)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(
> DAGScheduler.scala:1419)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(
> DAGScheduler.scala:1418)
> at scala.collection.mutable.ResizableArray$class.foreach(
> ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at org.apache.spark.scheduler.DAGScheduler.abortStage(
> DAGScheduler.scala:1418)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$
> handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$
> handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
> at scala.Option.foreach(Option.scala:236)
> at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(
> DAGScheduler.scala:799)
> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
> doOnReceive(DAGScheduler.scala:1640)
> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
> onReceive(DAGScheduler.scala:1599)
> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
> onReceive(DAGScheduler.scala:1588)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1857)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1870)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1883)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1954)
> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.
> apply(RDD.scala:920)
> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.
> apply(RDD.scala:918)
> at org.apache.spark.rdd.RDDOperationScope$.withScope(
> RDDOperationScope.scala:150)
> at org.apache.spark.rdd.RDDOperationScope$.withScope(
> RDDOperationScope.scala:111)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
> at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918)
> at org.apache.spark.api.java.JavaRDDLike$class.
> foreachPartition(JavaRDDLike.scala:225)
> at org.apache.spark.api.java.AbstractJavaRDDLike.
> foreachPartition(JavaRDDLike.scala:46)
> at org.necla.ngla.kafka.JavaRDDStringKafkaWriter.call(
> JavaRDDStringKafkaWriter.java:25)
> at org.necla.ngla.kafka.JavaRDDStringKafkaWriter.call(
> JavaRDDStringKafkaWriter.java:10)
> at org.apache.spark.streaming.api.java.JavaDStreamLike$$
> anonfun$foreachRDD$3.apply(JavaDStreamLike.scala:335)
> at org.apache.spark.streaming.api.java.JavaDStreamLike$$
> anonfun$foreachRDD$3.apply(JavaDStreamLike.scala:335)
> at org.apache.spark.streaming.dstream.DStream$$anonfun$
> foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
> at 

Resource Leak in Spark Streaming

2017-01-31 Thread Nipun Arora
Hi,

I get a resource leak, where the number of file descriptors in spark
streaming keeps increasing. We end up with a "too many file open" error
eventually through an exception caused in:

JAVARDDKafkaWriter, which is writing a spark JavaDStream

The exception is attached inline. Any help will be greatly appreciated.

Thanks
Nipun

---
Time: 148576253 ms
---

Exception in thread "main" org.apache.spark.SparkException: Job aborted due
to stage failure: Task 0 in stage 85968.0 failed 1 times, most recent
failure: Lost task 0.0 in stage 85968.0 (TID 29562, localhost):
java.io.FileNotFoundException:
/tmp/blockmgr-1b3ddc44-f9a4-42cd-977c-532cb962d7d3/3e/shuffle_10625_0_0.data.4651a131-6072-460b-b150-2b3080902084
(too many open files)
at java.io.FileOutputStream.open(Native Method)
at java.io.FileOutputStream.(FileOutputStream.java:221)
at
org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:88)
at
org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:181)
at
org.apache.spark.util.collection.WritablePartitionedPairCollection$$anon$1.writeNext(WritablePartitionedPairCollection.scala:56)
at
org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:659)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:72)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1857)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1870)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1883)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1954)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:920)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:918)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918)
at
org.apache.spark.api.java.JavaRDDLike$class.foreachPartition(JavaRDDLike.scala:225)
at
org.apache.spark.api.java.AbstractJavaRDDLike.foreachPartition(JavaRDDLike.scala:46)
at
org.necla.ngla.kafka.JavaRDDStringKafkaWriter.call(JavaRDDStringKafkaWriter.java:25)
at
org.necla.ngla.kafka.JavaRDDStringKafkaWriter.call(JavaRDDStringKafkaWriter.java:10)
at
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3.apply(JavaDStreamLike.scala:335)
at
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3.apply(JavaDStreamLike.scala:335)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at

Re: Jars directory in Spark 2.0

2017-01-31 Thread Koert Kuipers
you basically have to keep your versions of dependencies in line with
sparks or shade your own dependencies.

you cannot just replace the jars in sparks jars folder. if you wan to
update them you have to build spark yourself with updated dependencies and
confirm it compiles, passes tests etc.

On Tue, Jan 31, 2017 at 3:40 AM, Sidney Feiner 
wrote:

> Hey,
>
> While migrating to Spark 2.X from 1.6, I've had many issues with jars that
> come preloaded with Spark in the "jars/" directory and I had to shade most
> of my packages.
>
> Can I replace the jars in this folder to more up to date versions? Are
> those jar used for anything internal in Spark which means I can't blindly
> replace them?
>
>
>
> Thanks J
>
>
>
>
>
> *Sidney Feiner*   */*  SW Developer
>
> M: +972.528197720 <+972%2052-819-7720>  */*  Skype: sidney.feiner.startapp
>
>
>
> [image: StartApp] 
>
>
> [image: Meet Us at] 
>


Unique Partition Id per partition

2017-01-31 Thread Chawla,Sumit
Hi All

I have a rdd, which i partition based on some key, and then can sc.runJob
for each partition.
 Inside this function, i assign each partition a unique key using following:

"%s_%s" % (id(part), int(round(time.time()))

This is to make sure that, each partition produces separate bookeeping stuff,

which can be aggregated by external system. However, I sometimes i
notice multiple

partition results pointing to same partition_id. Is this some issue due to the

way above code is serialized by Pyspark. What's the best way to define
a unique id

for each partition. I undestand that its same executor getting
multiple partitions to process,

but i would expect the above code to produce a unique id for each partition.



Regards
Sumit Chawla


Multiple quantile calculations

2017-01-31 Thread Aaron Perrin
I want to calculate quantiles on two different columns.  I know that I can
calculate them with two separate operations. However, for performance
reasons, I'd like to calculate both with one operation.

Is this possible to do this with the Dataset API? I'm assuming that it
isn't. But, if it isn't, is it possible to calculate both in one pass,
assuming that I made some code changes? I briefly looked at the
approxQuantile code, but I haven't dug into the algorithm.


Roadblock -- stuck for 10 days :( how come same hive udf giving different results in spark and hive

2017-01-31 Thread Alex
Hi All,

i am trying to run a hive udf in spark-sql and its giving different rows as
result in both hive and spark..

My UDF query looks something like this

select col1,col2,col3, sum(col4) col4, sum(col5) col5,Group_name
from
(select inline(myudf('cons1',record))
from table1) test group by col1,col2,col3;

but the results are same till here if i give below subquery

its giving the same output

(select inline(myudf('cons1',record))
from table1) test group by col1,col2,col3;

But If I pass the entire script its giving different outputs in both hive
and spark


select col1,col2,col3, sum(col4) col4, sum(col5) col5,Group_name
from
(select inline(myudf('cons1',record))
from table1) test group by col1,col2,col3;

how come? :(


Error when loading saved ml model on pyspark (2.0.1)

2017-01-31 Thread Matheus Braun Magrin
Hi there,

I've posted this question on StackOverflow as well but I got no answers,
maybe you guys can help me out.

I'm building a Random Forest model using Spark and I want to save it to use
again later. I'm running this on pyspark (Spark 2.0.1) without HDFS, so the
files are saved to the local file system.

I've tried to do it like so:

import pyspark.sql.types as T
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier

data = [[0, 0, 0.],
[0, 1, 1.],
[1, 0, 1.],
[1, 1, 0.]]

schema = T.StructType([
T.StructField('a', T.IntegerType(), True),
T.StructField('b', T.IntegerType(), True),
T.StructField('label', T.DoubleType(), True)])

df = sqlContext.createDataFrame(data, schema)

assembler = VectorAssembler(inputCols=['a', 'b'], outputCol='features')
df = assembler.transform(df)

classifier = RandomForestClassifier(numTrees=10, maxDepth=15,
labelCol='label', featuresCol='features')
model = classifier.fit(df)

model.write().overwrite().save('saved_model')


And then, to load the model:

from pyspark.ml.classification import RandomForestClassificationModel

loaded_model = RandomForestClassificationModel.load('saved_model')


But I get this error:

Py4JJavaError: An error occurred while calling o108.load.
: java.lang.UnsupportedOperationException: empty collection

I'm not sure to which collection it is referring to. Any ideas how to
properly load (or save) the model?

Cheers,
--
Matheus Braun Magrin


Re: ML version of Kmeans

2017-01-31 Thread Aseem Bansal
If you want to predict using dataset then transform is the way to go. If
you want to predict on vectors then you will have to wait on this issue to
be completed https://issues.apache.org/jira/browse/SPARK-10413

On Tue, Jan 31, 2017 at 3:01 PM, Holden Karau  wrote:

> You most likely want the transform function on KMeansModel (although that
> works on a dataset input rather than a single element at a time).
>
> On Tue, Jan 31, 2017 at 1:24 AM, Madabhattula Rajesh Kumar <
> mrajaf...@gmail.com> wrote:
>
>> Hi,
>>
>> I am not able to find predict method on "ML" version of Kmeans.
>>
>> Mllib version has a predict method.  KMeansModel.predict(point: Vector)
>> .
>> How to predict the cluster point for new vectors in ML version of kmeans ?
>>
>> Regards,
>> Rajesh
>>
>
>
>
> --
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau
>


Re: ML version of Kmeans

2017-01-31 Thread Holden Karau
You most likely want the transform function on KMeansModel (although that
works on a dataset input rather than a single element at a time).

On Tue, Jan 31, 2017 at 1:24 AM, Madabhattula Rajesh Kumar <
mrajaf...@gmail.com> wrote:

> Hi,
>
> I am not able to find predict method on "ML" version of Kmeans.
>
> Mllib version has a predict method.  KMeansModel.predict(point: Vector)
> .
> How to predict the cluster point for new vectors in ML version of kmeans ?
>
> Regards,
> Rajesh
>



-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau


ML version of Kmeans

2017-01-31 Thread Madabhattula Rajesh Kumar
Hi,

I am not able to find predict method on "ML" version of Kmeans.

Mllib version has a predict method.  KMeansModel.predict(point: Vector)
.
How to predict the cluster point for new vectors in ML version of kmeans ?

Regards,
Rajesh


Jars directory in Spark 2.0

2017-01-31 Thread Sidney Feiner
Hey,
While migrating to Spark 2.X from 1.6, I've had many issues with jars that come 
preloaded with Spark in the "jars/" directory and I had to shade most of my 
packages.
Can I replace the jars in this folder to more up to date versions? Are those 
jar used for anything internal in Spark which means I can't blindly replace 
them?

Thanks :)


Sidney Feiner   /  SW Developer
M: +972.528197720  /  Skype: sidney.feiner.startapp

[StartApp]

[Meet Us at]