How do I flatten JSON blobs into a Data Frame using Spark/Spark SQL

2016-11-17 Thread kant kodali
Hi All,


I would like to flatten JSON blobs into a Data Frame using Spark/Spark SQl
inside Spark-Shell.

val df = spark.sql("select body from test limit 3"); // body is a json
encoded blob column

val df2 = df.select(df("body").cast(StringType).as("body"))


when I do

df2.show // shows the 3 rows


body



{"k1": "v1", "k2": "v2" }

{"k3": "v3"}

{"k4": "v4", "k5": "v5", "k6": "v6"}

-


Now say I have billion of these rows/records but at most there will 5
different json schemas for all billion rows. Now how do I flatten such that
I get a data frame in the format below? Should I use df.forEach or
df.forEachPartition or df.explode or df.flatMap? How do I make sure I am
not creating a billion data frames and trying to union all of them or
something more inefficient. It will be great if I could see a code sample.
Also since this might have Nil I wonder if they would take up any space? I
initially tried it with UDF's and I keep getting some exception saying
"Any" Type is not supported and so on. So an example on how to do this
would help!


"K1" | "K2" | "K3" | "K4" | "K5" | "K6"
---
"V1" | "V2" |
| "V3" |
   | "V4" | "V5" | "V6"

Needless to say, V1, V2, V3...V6 can be any type (String, boolean, integer,
Map...). Also open to any new ideas.


Thanks!


sort descending with multiple columns

2016-11-17 Thread Sreekanth Jella
Hi,

I'm trying to sort multiple columns and column names are dynamic.

df.sort(colList.head, colList.tail: _*)


But I'm not sure how to sort in descending order for all columns, I tried
this but it's for only first column..

df.sort(df.col(colList.head).desc)
How can I pass all column names (or some) with descending order.


Thanks,
Sreekanth


Re: Configure spark.kryoserializer.buffer.max at runtime does not take effect

2016-11-17 Thread kant kodali
yeah I feel like this is a bug since you can't really modify the settings
once you were given spark session or spark context. so the work around
would be to use --conf. In your case it would be like this

./spark-shell --conf spark.kryoserializer.buffer.max=1g



On Thu, Nov 17, 2016 at 1:59 PM, Koert Kuipers  wrote:

> getOrCreate uses existing SparkSession if available, in which case the
> settings will be ignored
>
> On Wed, Nov 16, 2016 at 10:55 PM, bluishpenguin  > wrote:
>
>> Hi all,
>> I would like to configure the following setting during runtime as below:
>>
>> spark = (SparkSession
>> .builder
>> .appName("ElasticSearchIndex")
>> .config("spark.kryoserializer.buffer.max", "1g")
>> .getOrCreate())
>>
>> But I still hit error,
>> Caused by: org.apache.spark.SparkException: Kryo serialization failed:
>> Buffer overflow. Available: 0, required: 1614707. To avoid this, increase
>> spark.kryoserializer.buffer.max value.
>>
>> It works when configure it along with the spark-submit command as below:
>> spark-submit pyspark-shell-main --name "PySparkShell" --conf
>> spark.kryoserializer.buffer.max=1g
>>
>> Any idea what have I done wrong?
>>
>> Thank you.
>>
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/Configure-spark-kryoserializer-buffer-
>> max-at-runtime-does-not-take-effect-tp28094.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


Is selecting different datasets from same parquet file blocking.

2016-11-17 Thread Rohit Verma
Hi

I have dataset which has 10 columns, created through a parquet file.
I want to perform some operations on each column.

I create 10 datasets as dsBig.select(col).

When I submit these 10 jobs will they be blocking each other as all of them 
reading from same parquet file. Is selecting different datasets from same 
parquet file blocking?

Is it better if I used first read as
dsBig.cache().select(col1)

Regards
Rohit


GraphX Pregel not update vertex state properly, cause messages loss

2016-11-17 Thread fuz_woo
hi,everyone, I encountered a strange problem these days when i'm attempting
to use the GraphX Pregel interface to implement a simple
single-source-shortest-path algorithm.
below is my code:

import com.alibaba.fastjson.JSONObject
import org.apache.spark.graphx._

import org.apache.spark.{SparkConf, SparkContext}

object PregelTest {

  def run(graph: Graph[JSONObject, JSONObject]): Graph[JSONObject,
JSONObject] = {

def vProg(v: VertexId, attr: JSONObject, msg: Integer): JSONObject = {
  if ( msg < 0 ) {
// init message received
if ( v.equals(0.asInstanceOf[VertexId]) ) attr.put("LENGTH", 0)
else attr.put("LENGTH", Integer.MAX_VALUE)
  } else {
attr.put("LENGTH", msg+1)
  }
  attr
}

def sendMsg(triplet: EdgeTriplet[JSONObject, JSONObject]):
Iterator[(VertexId, Integer)] = {
  val len = triplet.srcAttr.getInteger("LENGTH")
  // send a msg if last hub is reachable
  if ( len, it seems that the
messages sent to vertex 2 was lost unexpectedly. I then tracked the debugger
to file Pregel.scala,  where I saw the code:


 
 
In the first iteration 0, the variable messages in line 138 is reconstructed
, and then recomputed in line 143, in where activeMessages got a value 0,
which means the messages is lost.
then I set a breakpoint in line 138, and before its execution I execute an
expression " g.triplets().collect() " which just collects the updated graph
data. after I done this and execute the rest code, the messages is no longer
empty and activeMessages got value 1 as expected.  

I have tested the code with both spark& 1.4 and 1.6 in scala 2.10,
and got the same result.

I must say this problem makes me really confused, I've spent almost 2 weeks
to resolve it and I have no idea how to do it now. If this is not a bug, I
totally can't understand why just executing a non-disturb expression (
g.triplets().collect(), it just collect the data and do noting computing )
could changing the essential, it's really ridiculous.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-Pregel-not-update-vertex-state-properly-cause-messages-loss-tp28100.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Map and MapParitions with partition-local variable

2016-11-17 Thread Rohit Verma
Using a map and mapPartition on same df at the same time doesn't make much 
sense to me.
Also without complete infor I am assuming that you have some partition strategy 
being defined/influenced by map operation. In that case you can create a 
hashmap of map values for each partitions, do mapPartitionByIndex, broadcast 
the hashmap and in each partitions retrieve required value from map

Rohit

On 18-Nov-2016 2:27 AM, Zsolt Tóth  wrote:

Any comment on this one?

2016. nov. 16. du. 12:59 ezt írta ("Zsolt Tóth" 
>):
Hi,

I need to run a map() and a mapPartitions() on my input DF. As a side-effect of 
the map(), a partition-local variable should be updated, that is used in the 
mapPartitions() afterwards.
I can't use Broadcast variable, because it's shared between partitions on the 
same executor.

Where can I define this variable?
I could run a single mapPartitions() that defines the variable, iterates over 
the input (just as the map() would do), collect the result into an ArrayList, 
and then use the list's iterator (and the updated partition-local variable) as 
the input of the transformation that the original mapPartitions() did.

It feels however, that this is not as optimal as running map()+mapPartitions() 
because I need to store the ArrayList (which is basically the whole data in the 
partition) in memory.

Thanks,
Zsolt



Re: Long-running job OOMs driver process

2016-11-17 Thread Alexis Seigneurin
Hi Irina,

I would question the use of multiple threads in your application. Since
Spark is going to run the processing of each DataFrame on all the cores of
your cluster, the processes will be competing for resources. In fact, they
would not only compete for CPU cores but also for memory.

Spark is designed to run your processes in a sequence, and each process
will be run in a distributed manner (multiple threads on multiple
instances). I would suggest to follow this principle.

Feel free to share to code if you can. It's always helpful so that we can
give better advice.

Alexis

On Thu, Nov 17, 2016 at 8:51 PM, Irina Truong  wrote:

> We have an application that reads text files, converts them to dataframes,
> and saves them in Parquet format. The application runs fine when processing
> a few files, but we have several thousand produced every day. When running
> the job for all files, we have spark-submit killed on OOM:
>
> #
> # java.lang.OutOfMemoryError: Java heap space
> # -XX:OnOutOfMemoryError="kill -9 %p"
> #   Executing /bin/sh -c "kill -9 27226"...
>
> The job is written in Python. We’re running it in Amazon EMR 5.0 (Spark
> 2.0.0) with spark-submit. We’re using a cluster with a master c3.2xlarge
> instance (8 cores and 15g of RAM) and 3 core c3.4xlarge instances (16 cores
> and 30g of RAM each). Spark config settings are as follows:
>
> ('spark.serializer', 'org.apache.spark.serializer.KryoSerializer'),
>
> ('spark.executors.instances', '3'),
>
> ('spark.yarn.executor.memoryOverhead', '9g'),
>
> ('spark.executor.cores', '15'),
>
> ('spark.executor.memory', '12g'),
>
> ('spark.scheduler.mode', 'FIFO'),
>
> ('spark.cleaner.ttl', '1800'),
>
> The job processes each file in a thread, and we have 10 threads running
> concurrently. The process will OOM after about 4 hours, at which point
> Spark has processed over 20,000 jobs.
>
> It seems like the driver is running out of memory, but each individual job
> is quite small. Are there any known memory leaks for long-running Spark
> applications on Yarn?
>



-- 

*Alexis Seigneurin*
*Managing Consultant*
(202) 459-1591 <202%20459.1591> - LinkedIn



Rate our service 


Re: How to propagate R_LIBS to sparkr executors

2016-11-17 Thread Felix Cheung
Have you tried
spark.executorEnv.R_LIBS?

spark.apache.org/docs/latest/configuration.html#runtime-environment

_
From: Rodrick Brown >
Sent: Wednesday, November 16, 2016 1:01 PM
Subject: How to propagate R_LIBS to sparkr executors
To: >


I'm having an issue with a R module not getting picked up on the slave nodes in 
mesos. I have the following environment value R_LIBS set and for some reason 
this environment is only set in the driver context and not the executor is 
their a way to pass environment values down the executor level in sparkr?

I'm using Mesos 1.0.1 and Spark 2.0.1

Thanks.


--

[Orchard Platform]

Rodrick Brown / Site Reliability Engineer
+1 917 445 6839 / 
rodr...@orchardplatform.com

Orchard Platform
101 5th Avenue, 4th Floor, New York, NY 10003
http://www.orchardplatform.com

Orchard Blog | Marketplace Lending 
Meetup


NOTICE TO RECIPIENTS: This communication is confidential and intended for the 
use of the addressee only. If you are not an intended recipient of this 
communication, please delete it immediately and notify the sender by return 
email. Unauthorized reading, dissemination, distribution or copying of this 
communication is prohibited. This communication does not constitute an offer to 
sell or a solicitation of an indication of interest to purchase any loan, 
security or any other financial product or instrument, nor is it an offer to 
sell or a solicitation of an indication of interest to purchase any products or 
services to any persons who are prohibited from receiving such information 
under applicable law. The contents of this communication may not be accurate or 
complete and are subject to change without notice. As such, Orchard App, Inc. 
(including its subsidiaries and affiliates, "Orchard") makes no representation 
regarding the accuracy or completeness of the information contained herein. The 
intended recipient is advised to consult its own professional advisors, 
including those specializing in legal, tax and accounting matters. Orchard does 
not provide legal, tax or accounting advice.




Long-running job OOMs driver process

2016-11-17 Thread Irina Truong
We have an application that reads text files, converts them to dataframes,
and saves them in Parquet format. The application runs fine when processing
a few files, but we have several thousand produced every day. When running
the job for all files, we have spark-submit killed on OOM:

#
# java.lang.OutOfMemoryError: Java heap space
# -XX:OnOutOfMemoryError="kill -9 %p"
#   Executing /bin/sh -c "kill -9 27226"...

The job is written in Python. We’re running it in Amazon EMR 5.0 (Spark
2.0.0) with spark-submit. We’re using a cluster with a master c3.2xlarge
instance (8 cores and 15g of RAM) and 3 core c3.4xlarge instances (16 cores
and 30g of RAM each). Spark config settings are as follows:

('spark.serializer', 'org.apache.spark.serializer.KryoSerializer'),

('spark.executors.instances', '3'),

('spark.yarn.executor.memoryOverhead', '9g'),

('spark.executor.cores', '15'),

('spark.executor.memory', '12g'),

('spark.scheduler.mode', 'FIFO'),

('spark.cleaner.ttl', '1800'),

The job processes each file in a thread, and we have 10 threads running
concurrently. The process will OOM after about 4 hours, at which point
Spark has processed over 20,000 jobs.

It seems like the driver is running out of memory, but each individual job
is quite small. Are there any known memory leaks for long-running Spark
applications on Yarn?


kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a single partition on a multi partition topic

2016-11-17 Thread Hster Geguri
Our team is trying to upgrade to Spark 2.0.2/Kafka 0.10.1.0 and we have
been struggling with this show stopper problem.

When we run our drivers with auto.offset.reset=latest ingesting from a
single kafka topic with 10 partitions, the driver reads correctly from all
10 partitions.

However when we use auto.offset.reset=earliest, the driver will read only a
single partition.

When we turn on the debug logs, we sometimes see partitions being set to
different offset configuration even though the consumer config correctly
indicates auto.offset.reset=earliest.

8 DEBUG Resetting offset for *partition simple_test-8 to earliest offset*.
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 DEBUG Resetting offset for *partition simple_test-9 to latest offset*.
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 8 TRACE Sending ListOffsetRequest
> {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=8,timestamp=-2}]}]}
> to broker 10.102.20.12:9092 (id: 12 rack: null)
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 TRACE Sending ListOffsetRequest
> {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=9,timestamp=-1}]}]}
> to broker 10.102.20.13:9092 (id: 13 rack: null)
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 8 TRACE Received ListOffsetResponse
> {responses=[{topic=simple_test,partition_responses=[{partition=8,error_code=0,timestamp=-1,offset=0}]}]}
> from broker 10.102.20.12:9092 (id: 12 rack: null)
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 TRACE Received ListOffsetResponse
> {responses=[{topic=simple_test,partition_responses=[{partition=9,error_code=0,timestamp=-1,offset=66724}]}]}
> from broker 10.102.20.13:9092 (id: 13 rack: null)
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 8 DEBUG Fetched {timestamp=-1, offset=0} for partition simple_test-8
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 DEBUG Fetched {timestamp=-1, offset=66724} for partition simple_test-9
> (org.apache.kafka.clients.consumer.internals.Fetcher)



I've enclosed below the completely stripped down trivial test driver that
shows this behavior. We normally run with YARN 2.7.3 but have also tried
running spark standalone mode which has the same behavior. Our drivers are
normally java but we have tried the scala version which also has the same
incorrect behavior. We have tried different LocationStrategies and
partition assignment strategies all without success.  Any insight would be
greatly appreciated.

package com.x.labs.analytics.diagnostics.spark.drivers

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies
import org.apache.spark.streaming.kafka010.ConsumerStrategies


/**
  *
  * This driver is only for pulling data from the stream and logging
to output just to isolate single partition bug
  */
object SimpleKafkaLoggingDriver {
  def main(args: Array[String]) {
if (args.length != 4) {
  System.err.println("Usage: SimpleTestDriver")
  System.exit(1)
}

val Array(brokers, topic, groupId, offsetReset) = args
val preferredHosts = LocationStrategies.PreferConsistent
val topics = List(topic)

val kafkaParams = Map(
  "bootstrap.servers" -> brokers,
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> groupId,
  "auto.offset.reset" -> offsetReset,
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

val sparkConf = new SparkConf().setAppName("SimpleTestDriver"+"_" +topic)
val streamingContext = new StreamingContext(sparkConf, Seconds(5))


val dstream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  preferredHosts,
  ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))

dstream.foreachRDD { rdd =>
  // Get the offset ranges in the RDD and log
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  for (o <- offsetRanges) {
println(s"${o.topic} ${o.partition} offsets: ${o.fromOffset}
to ${o.untilOffset}")
  }
}

streamingContext.start
streamingContext.awaitTermination()

  }

}



16/11/17 23:08:21 INFO ConsumerConfig: ConsumerConfig values:

auto.commit.interval.ms = 5000

auto.offset.reset = earliest

bootstrap.servers = [10.102.22.11:9092, 10.102.22.12:9092]

check.crcs = true

client.id =

connections.max.idle.ms = 54

enable.auto.commit = false

exclude.internal.topics = true

fetch.max.bytes = 52428800

fetch.max.wait.ms = 500

fetch.min.bytes = 1

group.id = simple_test_group

heartbeat.interval.ms = 3000

interceptor.classes = null

key.deserializer = class
> org.apache.kafka.common.serialization.StringDeserializer

max.partition.fetch.bytes = 1048576


Re: Spark 2.0.2 with Kafka source, Error please help!

2016-11-17 Thread shyla deshpande
Thanks Zhu, That was it. Now works great!

On Thu, Nov 17, 2016 at 1:07 PM, Shixiong(Ryan) Zhu  wrote:

> The problem is "optional Gender gender = 3;". The generated class "Gender"
> is a trait, and Spark cannot know how to create a trait so it's not
> supported. You can define your class which is supported by SQL Encoder, and
> convert this generated class to the new class in `parseLine`.
>
> On Wed, Nov 16, 2016 at 4:22 PM, shyla deshpande  > wrote:
>
>> Ryan,
>>
>> I just wanted to provide more info. Here is my .proto file which is the
>> basis for generating the Person class. Thanks.
>>
>> option java_package = "com.example.protos";
>> enum Gender {
>> MALE = 1;
>> FEMALE = 2;
>> }
>> message Address {
>> optional string street = 1;
>> optional string city = 2;
>> }
>> message Person {
>> optional string name = 1;
>> optional int32 age = 2;
>> optional Gender gender = 3;
>> repeated string tags = 4;
>> repeated Address addresses = 5;
>> }
>>
>>
>> On Wed, Nov 16, 2016 at 3:04 PM, shyla deshpande <
>> deshpandesh...@gmail.com> wrote:
>>
>>> *Thanks for the response. Following is the Person class..*
>>>
>>> // Generated by the Scala Plugin for the Protocol Buffer Compiler.
>>> // Do not edit!
>>> //
>>> // Protofile syntax: PROTO2
>>>
>>> package com.example.protos.demo
>>>
>>>
>>>
>>> @SerialVersionUID(0L)
>>> final case class Person(
>>> name: scala.Option[String] = None,
>>> age: scala.Option[Int] = None,
>>> gender: scala.Option[com.example.protos.demo.Gender] = None,
>>> tags: scala.collection.Seq[String] = Nil,
>>> addresses: scala.collection.Seq[com.example.protos.demo.Address] = Nil
>>> ) extends com.trueaccord.scalapb.GeneratedMessage with 
>>> com.trueaccord.scalapb.Message[Person] with 
>>> com.trueaccord.lenses.Updatable[Person] {
>>> @transient
>>> private[this] var __serializedSizeCachedValue: Int = 0
>>> private[this] def __computeSerializedValue(): Int = {
>>>   var __size = 0
>>>   if (name.isDefined) { __size += 
>>> com.google.protobuf.CodedOutputStream.computeStringSize(1, name.get) }
>>>   if (age.isDefined) { __size += 
>>> com.google.protobuf.CodedOutputStream.computeInt32Size(2, age.get) }
>>>   if (gender.isDefined) { __size += 
>>> com.google.protobuf.CodedOutputStream.computeEnumSize(3, gender.get.value) }
>>>   tags.foreach(tags => __size += 
>>> com.google.protobuf.CodedOutputStream.computeStringSize(4, tags))
>>>   addresses.foreach(addresses => __size += 1 + 
>>> com.google.protobuf.CodedOutputStream.computeUInt32SizeNoTag(addresses.serializedSize)
>>>  + addresses.serializedSize)
>>>   __size
>>> }
>>> final override def serializedSize: Int = {
>>>   var read = __serializedSizeCachedValue
>>>   if (read == 0) {
>>> read = __computeSerializedValue()
>>> __serializedSizeCachedValue = read
>>>   }
>>>   read
>>> }
>>> def writeTo(`_output__`: com.google.protobuf.CodedOutputStream): Unit = 
>>> {
>>>   name.foreach { __v =>
>>> _output__.writeString(1, __v)
>>>   };
>>>   age.foreach { __v =>
>>> _output__.writeInt32(2, __v)
>>>   };
>>>   gender.foreach { __v =>
>>> _output__.writeEnum(3, __v.value)
>>>   };
>>>   tags.foreach { __v =>
>>> _output__.writeString(4, __v)
>>>   };
>>>   addresses.foreach { __v =>
>>> _output__.writeTag(5, 2)
>>> _output__.writeUInt32NoTag(__v.serializedSize)
>>> __v.writeTo(_output__)
>>>   };
>>> }
>>> def mergeFrom(`_input__`: com.google.protobuf.CodedInputStream): 
>>> com.example.protos.demo.Person = {
>>>   var __name = this.name
>>>   var __age = this.age
>>>   var __gender = this.gender
>>>   val __tags = (scala.collection.immutable.Vector.newBuilder[String] 
>>> ++= this.tags)
>>>   val __addresses = 
>>> (scala.collection.immutable.Vector.newBuilder[com.example.protos.demo.Address]
>>>  ++= this.addresses)
>>>   var _done__ = false
>>>   while (!_done__) {
>>> val _tag__ = _input__.readTag()
>>> _tag__ match {
>>>   case 0 => _done__ = true
>>>   case 10 =>
>>> __name = Some(_input__.readString())
>>>   case 16 =>
>>> __age = Some(_input__.readInt32())
>>>   case 24 =>
>>> __gender = 
>>> Some(com.example.protos.demo.Gender.fromValue(_input__.readEnum()))
>>>   case 34 =>
>>> __tags += _input__.readString()
>>>   case 42 =>
>>> __addresses += 
>>> com.trueaccord.scalapb.LiteParser.readMessage(_input__, 
>>> com.example.protos.demo.Address.defaultInstance)
>>>   case tag => _input__.skipField(tag)
>>> }
>>>   }
>>>   com.example.protos.demo.Person(
>>>   name = __name,
>>>   age = __age,
>>>   gender = __gender,
>>>   tags = 

Re: Configure spark.kryoserializer.buffer.max at runtime does not take effect

2016-11-17 Thread Koert Kuipers
getOrCreate uses existing SparkSession if available, in which case the
settings will be ignored

On Wed, Nov 16, 2016 at 10:55 PM, bluishpenguin 
wrote:

> Hi all,
> I would like to configure the following setting during runtime as below:
>
> spark = (SparkSession
> .builder
> .appName("ElasticSearchIndex")
> .config("spark.kryoserializer.buffer.max", "1g")
> .getOrCreate())
>
> But I still hit error,
> Caused by: org.apache.spark.SparkException: Kryo serialization failed:
> Buffer overflow. Available: 0, required: 1614707. To avoid this, increase
> spark.kryoserializer.buffer.max value.
>
> It works when configure it along with the spark-submit command as below:
> spark-submit pyspark-shell-main --name "PySparkShell" --conf
> spark.kryoserializer.buffer.max=1g
>
> Any idea what have I done wrong?
>
> Thank you.
>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Configure-spark-kryoserializer-buffer-max-at-
> runtime-does-not-take-effect-tp28094.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark Partitioning Strategy with Parquet

2016-11-17 Thread ayan guha
Hi

I think you can use map reduce paradigm here. Create a key  using user ID
and date and record as a value. Then you can express your operation (do
something) part as a function. If the function meets certain criteria such
as associative and cumulative like, say Add or multiplication, you can use
reducebykey, else you may use groupbykey.

HTH
On 18 Nov 2016 06:45, "titli batali"  wrote:

>
> That would help but again in a particular partitions i would need to a
> iterate over the customers having first n letters of user id in that
> partition. I want to get rid of nested iterations.
>
> Thanks
>
> On Thu, Nov 17, 2016 at 10:28 PM, Xiaomeng Wan  wrote:
>
>> You can partitioned on the first n letters of userid
>>
>> On 17 November 2016 at 08:25, titli batali  wrote:
>>
>>> Hi,
>>>
>>> I have a use case, where we have 1000 csv files with a column user_Id,
>>> having 8 million unique users. The data contains: userid,date,transaction,
>>> where we run some queries.
>>>
>>> We have a case where we need to iterate for each transaction in a
>>> particular date for each user. There is three nesting loops
>>>
>>> for(user){
>>>   for(date){
>>> for(transactions){
>>>   //Do Something
>>>   }
>>>}
>>> }
>>>
>>> i.e we do similar thing for every (date,transaction) tuple for a
>>> particular user. In order to get away with loop structure and decrease the
>>> processing time We are converting converting the csv files to parquet and
>>> partioning it with userid, df.write.format("parquet").par
>>> titionBy("useridcol").save("hdfs://path").
>>>
>>> So that while reading the parquet files, we read a particular user in a
>>> particular partition and create a Cartesian product of (date X transaction)
>>> and work on the tuple in each partition, to achieve the above level of
>>> nesting. Partitioning on 8 million users is it a bad option. What could be
>>> a better way to achieve this?
>>>
>>> Thanks
>>>
>>>
>>>
>>
>>
>


Re: HiveContext.getOrCreate not accessible

2016-11-17 Thread Shixiong(Ryan) Zhu
`SQLContext.getOrCreate` will return the HiveContext you created.

On Mon, Nov 14, 2016 at 11:17 PM, Praseetha  wrote:

>
> Hi All,
>
>
> I have a streaming app and when i try invoking the
> HiveContext.getOrCreate, it errors out with the following stmt. 'object
> HiveContext in package hive cannot be accessed in package
> org.apache.spark.sql.hive'
>
> I would require HiveContext instead of SQLContext for my application and
> creating new HiveContext everytime would not be a feasible solution.
>
> Here is my code snippet:
> object sampleStreamingApp  {
>
>   def createStreamingContext(checkpointDirectory: String):
> StreamingContext = {
> val conf = new SparkConf().setAppName("sampleStreaming")
> val sc = new SparkContext(conf)
> val ssc = new StreamingContext(sc, Milliseconds(5000))
> ssc.checkpoint(checkpointDirectory)
> val smDStream = ssc.textFileStream("/user/hdpuser/data")
> val smSplitted = smDStream.map( x => x.split(";") ).map( x => Row.fromSeq(
> x ) )
> smSplitted.foreachRDD { rdd =>
>  val sqlContext = HiveContext.getOrCreate(rdd.
> sparkContext)
> import sqlContext.implicits._
> 
> }
> }
> ssc
>   }
>
>   def main(args: Array[String]) {
>   val checkpointDirectory = "hdfs://localhost:8020/user/
> dfml/checkpointing/AAA"
>   val ssc = StreamingContext.getActiveOrCreate(checkpointDirectory, () =>
> createStreamingContext(checkpointDirectory))
>
>   ssc.start()
>   ssc.awaitTermination()
>   }
> }
>
> Any help would be appreciated.
>
> Regds,
> --Praseetha
>


Re: Spark 2.0.2 with Kafka source, Error please help!

2016-11-17 Thread Shixiong(Ryan) Zhu
The problem is "optional Gender gender = 3;". The generated class "Gender"
is a trait, and Spark cannot know how to create a trait so it's not
supported. You can define your class which is supported by SQL Encoder, and
convert this generated class to the new class in `parseLine`.

On Wed, Nov 16, 2016 at 4:22 PM, shyla deshpande 
wrote:

> Ryan,
>
> I just wanted to provide more info. Here is my .proto file which is the
> basis for generating the Person class. Thanks.
>
> option java_package = "com.example.protos";
> enum Gender {
> MALE = 1;
> FEMALE = 2;
> }
> message Address {
> optional string street = 1;
> optional string city = 2;
> }
> message Person {
> optional string name = 1;
> optional int32 age = 2;
> optional Gender gender = 3;
> repeated string tags = 4;
> repeated Address addresses = 5;
> }
>
>
> On Wed, Nov 16, 2016 at 3:04 PM, shyla deshpande  > wrote:
>
>> *Thanks for the response. Following is the Person class..*
>>
>> // Generated by the Scala Plugin for the Protocol Buffer Compiler.
>> // Do not edit!
>> //
>> // Protofile syntax: PROTO2
>>
>> package com.example.protos.demo
>>
>>
>>
>> @SerialVersionUID(0L)
>> final case class Person(
>> name: scala.Option[String] = None,
>> age: scala.Option[Int] = None,
>> gender: scala.Option[com.example.protos.demo.Gender] = None,
>> tags: scala.collection.Seq[String] = Nil,
>> addresses: scala.collection.Seq[com.example.protos.demo.Address] = Nil
>> ) extends com.trueaccord.scalapb.GeneratedMessage with 
>> com.trueaccord.scalapb.Message[Person] with 
>> com.trueaccord.lenses.Updatable[Person] {
>> @transient
>> private[this] var __serializedSizeCachedValue: Int = 0
>> private[this] def __computeSerializedValue(): Int = {
>>   var __size = 0
>>   if (name.isDefined) { __size += 
>> com.google.protobuf.CodedOutputStream.computeStringSize(1, name.get) }
>>   if (age.isDefined) { __size += 
>> com.google.protobuf.CodedOutputStream.computeInt32Size(2, age.get) }
>>   if (gender.isDefined) { __size += 
>> com.google.protobuf.CodedOutputStream.computeEnumSize(3, gender.get.value) }
>>   tags.foreach(tags => __size += 
>> com.google.protobuf.CodedOutputStream.computeStringSize(4, tags))
>>   addresses.foreach(addresses => __size += 1 + 
>> com.google.protobuf.CodedOutputStream.computeUInt32SizeNoTag(addresses.serializedSize)
>>  + addresses.serializedSize)
>>   __size
>> }
>> final override def serializedSize: Int = {
>>   var read = __serializedSizeCachedValue
>>   if (read == 0) {
>> read = __computeSerializedValue()
>> __serializedSizeCachedValue = read
>>   }
>>   read
>> }
>> def writeTo(`_output__`: com.google.protobuf.CodedOutputStream): Unit = {
>>   name.foreach { __v =>
>> _output__.writeString(1, __v)
>>   };
>>   age.foreach { __v =>
>> _output__.writeInt32(2, __v)
>>   };
>>   gender.foreach { __v =>
>> _output__.writeEnum(3, __v.value)
>>   };
>>   tags.foreach { __v =>
>> _output__.writeString(4, __v)
>>   };
>>   addresses.foreach { __v =>
>> _output__.writeTag(5, 2)
>> _output__.writeUInt32NoTag(__v.serializedSize)
>> __v.writeTo(_output__)
>>   };
>> }
>> def mergeFrom(`_input__`: com.google.protobuf.CodedInputStream): 
>> com.example.protos.demo.Person = {
>>   var __name = this.name
>>   var __age = this.age
>>   var __gender = this.gender
>>   val __tags = (scala.collection.immutable.Vector.newBuilder[String] ++= 
>> this.tags)
>>   val __addresses = 
>> (scala.collection.immutable.Vector.newBuilder[com.example.protos.demo.Address]
>>  ++= this.addresses)
>>   var _done__ = false
>>   while (!_done__) {
>> val _tag__ = _input__.readTag()
>> _tag__ match {
>>   case 0 => _done__ = true
>>   case 10 =>
>> __name = Some(_input__.readString())
>>   case 16 =>
>> __age = Some(_input__.readInt32())
>>   case 24 =>
>> __gender = 
>> Some(com.example.protos.demo.Gender.fromValue(_input__.readEnum()))
>>   case 34 =>
>> __tags += _input__.readString()
>>   case 42 =>
>> __addresses += 
>> com.trueaccord.scalapb.LiteParser.readMessage(_input__, 
>> com.example.protos.demo.Address.defaultInstance)
>>   case tag => _input__.skipField(tag)
>> }
>>   }
>>   com.example.protos.demo.Person(
>>   name = __name,
>>   age = __age,
>>   gender = __gender,
>>   tags = __tags.result(),
>>   addresses = __addresses.result()
>>   )
>> }
>> def getName: String = name.getOrElse("")
>> def clearName: Person = copy(name = None)
>> def withName(__v: String): Person = copy(name = Some(__v))
>> def getAge: Int = 

Re: Spark 2.0.2, Structured Streaming with kafka source... Unable to parse the value to Object..

2016-11-17 Thread shyla deshpande
Hello everyone,
 The following code works ...

def main(args : Array[String]) {

  val spark = SparkSession.builder.
master("local")
.appName("spark session example")
.getOrCreate()

  import spark.implicits._

  val ds1 = spark.readStream.format("kafka").
option("kafka.bootstrap.servers","localhost:9092").
option("subscribe","student").load()

  val ds2 = ds1.map(row=>
row.getAs[Array[Byte]]("value")).map(Student.parseFrom(_))

  val query = ds2.writeStream
.outputMode("append")
.format("console")
.start()

  query.awaitTermination()

}


On Thu, Nov 17, 2016 at 11:30 AM, shyla deshpande 
wrote:

> val spark = SparkSession.builder.
>   master("local")
>   .appName("spark session example")
>   .getOrCreate()
>
> import spark.implicits._
>
> val dframe1 = spark.readStream.format("kafka").
>   option("kafka.bootstrap.servers","localhost:9092").
>   option("subscribe","student").load()
>
> *How do I deserialize the value column from dataframe1 *
>
> *which is Array[Byte] to Student object using Student.parseFrom..???*
>
> *Please help.*
>
> *Thanks.*
>
>
>
> // Stream of votes from Kafka as bytesval votesAsBytes = 
> KafkaUtils.createDirectStream[String, Array[Byte]](
>   ssc, LocationStrategies.PreferConsistent,
>   ConsumerStrategies.Subscribe[String, Array[Byte]](Array("votes"), 
> kafkaParams))
> // Parse them into Vote case class.val votes: DStream[Vote] = 
> votesAsBytes.map {
>   (cr: ConsumerRecord[String, Array[Byte]]) =>
> Vote.parseFrom(cr.value())}
>
>


Map and MapParitions with partition-local variable

2016-11-17 Thread Zsolt Tóth
Any comment on this one?

2016. nov. 16. du. 12:59 ezt írta ("Zsolt Tóth" ):

> Hi,
>
> I need to run a map() and a mapPartitions() on my input DF. As a
> side-effect of the map(), a partition-local variable should be updated,
> that is used in the mapPartitions() afterwards.
> I can't use Broadcast variable, because it's shared between partitions on
> the same executor.
>
> Where can I define this variable?
> I could run a single mapPartitions() that defines the variable, iterates
> over the input (just as the map() would do), collect the result into an
> ArrayList, and then use the list's iterator (and the updated
> partition-local variable) as the input of the transformation that the
> original mapPartitions() did.
>
> It feels however, that this is not as optimal as running
> map()+mapPartitions() because I need to store the ArrayList (which is
> basically the whole data in the partition) in memory.
>
> Thanks,
> Zsolt
>


Re: How do I convert json_encoded_blob_column into a data frame? (This may be a feature request)

2016-11-17 Thread Reynold Xin
Adding a new data type is an enormous undertaking and very invasive. I
don't think it is worth it in this case given there are clear, simple
workarounds.


On Thu, Nov 17, 2016 at 12:24 PM, kant kodali  wrote:

> Can we have a JSONType for Spark SQL?
>
> On Wed, Nov 16, 2016 at 8:41 PM, Nathan Lande 
> wrote:
>
>> If you are dealing with a bunch of different schemas in 1 field, figuring
>> out a strategy to deal with that will depend on your data and does not
>> really have anything to do with spark since mapping your JSON payloads to
>> tractable data structures will depend on business logic.
>>
>> The strategy of pulling out a blob into its on rdd and feeding it into
>> the JSON loader should work for any data source once you have your data
>> strategy figured out.
>>
>> On Wed, Nov 16, 2016 at 4:39 PM, kant kodali  wrote:
>>
>>> 1. I have a Cassandra Table where one of the columns is blob. And this
>>> blob contains a JSON encoded String however not all the blob's across the
>>> Cassandra table for that column are same (some blobs have difference json's
>>> than others) so In that case what is the best way to approach it? Do we
>>> need to put /group all the JSON Blobs that have same structure (same keys)
>>> into each individual data frame? For example, say if I have 5 json blobs
>>> that have same structure and another 3 JSON blobs that belongs to some
>>> other structure In this case do I need to create two data frames? (Attached
>>> is a screen shot of 2 rows of how my json looks like)
>>> 2. In my case, toJSON on RDD doesn't seem to help a lot. Attached a
>>> screen shot. Looks like I got the same data frame as my original one.
>>>
>>> Thanks much for these examples.
>>>
>>>
>>>
>>> On Wed, Nov 16, 2016 at 2:54 PM, Nathan Lande 
>>> wrote:
>>>
 I'm looking forward to 2.1 but, in the meantime, you can pull out the
 specific column into an RDD of JSON objects, pass this RDD into the
 read.json() and then join the results back onto your initial DF.

 Here is an example of what we do to unpack headers from Avro log data:

 def jsonLoad(path):
 #
 #load in the df
 raw = (sqlContext.read
 .format('com.databricks.spark.avro')
 .load(path)
 )
 #
 #define json blob, add primary key elements (hi and lo)
 #
 JSONBlob = concat(
 lit('{'),
 concat(lit('"lo":'), col('header.eventId.lo').cast('string'),
 lit(',')),
 concat(lit('"hi":'), col('header.eventId.hi').cast('string'),
 lit(',')),
 concat(lit('"response":'), decode('requestResponse.response',
 'UTF-8')),
 lit('}')
 )
 #
 #extract the JSON blob as a string
 rawJSONString = raw.select(JSONBlob).rdd.map(lambda x: str(x[0]))
 #
 #transform the JSON string into a DF struct object
 structuredJSON = sqlContext.read.json(rawJSONString)
 #
 #join the structured JSON back onto the initial DF using the hi and
 lo join keys
 final = (raw.join(structuredJSON,
 ((raw['header.eventId.lo'] == structuredJSON['lo']) &
 (raw['header.eventId.hi'] == structuredJSON['hi'])),
 'left_outer')
 .drop('hi')
 .drop('lo')
 )
 #
 #win
 return final

 On Wed, Nov 16, 2016 at 10:50 AM, Michael Armbrust <
 mich...@databricks.com> wrote:

> On Wed, Nov 16, 2016 at 2:49 AM, Hyukjin Kwon 
> wrote:
>
>> Maybe it sounds like you are looking for from_json/to_json functions
>> after en/decoding properly.
>>
>
> Which are new built-in functions that will be released with Spark 2.1.
>


>>>
>>
>


Re: How do I convert json_encoded_blob_column into a data frame? (This may be a feature request)

2016-11-17 Thread kant kodali
Can we have a JSONType for Spark SQL?

On Wed, Nov 16, 2016 at 8:41 PM, Nathan Lande  wrote:

> If you are dealing with a bunch of different schemas in 1 field, figuring
> out a strategy to deal with that will depend on your data and does not
> really have anything to do with spark since mapping your JSON payloads to
> tractable data structures will depend on business logic.
>
> The strategy of pulling out a blob into its on rdd and feeding it into the
> JSON loader should work for any data source once you have your data
> strategy figured out.
>
> On Wed, Nov 16, 2016 at 4:39 PM, kant kodali  wrote:
>
>> 1. I have a Cassandra Table where one of the columns is blob. And this
>> blob contains a JSON encoded String however not all the blob's across the
>> Cassandra table for that column are same (some blobs have difference json's
>> than others) so In that case what is the best way to approach it? Do we
>> need to put /group all the JSON Blobs that have same structure (same keys)
>> into each individual data frame? For example, say if I have 5 json blobs
>> that have same structure and another 3 JSON blobs that belongs to some
>> other structure In this case do I need to create two data frames? (Attached
>> is a screen shot of 2 rows of how my json looks like)
>> 2. In my case, toJSON on RDD doesn't seem to help a lot. Attached a
>> screen shot. Looks like I got the same data frame as my original one.
>>
>> Thanks much for these examples.
>>
>>
>>
>> On Wed, Nov 16, 2016 at 2:54 PM, Nathan Lande 
>> wrote:
>>
>>> I'm looking forward to 2.1 but, in the meantime, you can pull out the
>>> specific column into an RDD of JSON objects, pass this RDD into the
>>> read.json() and then join the results back onto your initial DF.
>>>
>>> Here is an example of what we do to unpack headers from Avro log data:
>>>
>>> def jsonLoad(path):
>>> #
>>> #load in the df
>>> raw = (sqlContext.read
>>> .format('com.databricks.spark.avro')
>>> .load(path)
>>> )
>>> #
>>> #define json blob, add primary key elements (hi and lo)
>>> #
>>> JSONBlob = concat(
>>> lit('{'),
>>> concat(lit('"lo":'), col('header.eventId.lo').cast('string'),
>>> lit(',')),
>>> concat(lit('"hi":'), col('header.eventId.hi').cast('string'),
>>> lit(',')),
>>> concat(lit('"response":'), decode('requestResponse.response',
>>> 'UTF-8')),
>>> lit('}')
>>> )
>>> #
>>> #extract the JSON blob as a string
>>> rawJSONString = raw.select(JSONBlob).rdd.map(lambda x: str(x[0]))
>>> #
>>> #transform the JSON string into a DF struct object
>>> structuredJSON = sqlContext.read.json(rawJSONString)
>>> #
>>> #join the structured JSON back onto the initial DF using the hi and
>>> lo join keys
>>> final = (raw.join(structuredJSON,
>>> ((raw['header.eventId.lo'] == structuredJSON['lo']) &
>>> (raw['header.eventId.hi'] == structuredJSON['hi'])),
>>> 'left_outer')
>>> .drop('hi')
>>> .drop('lo')
>>> )
>>> #
>>> #win
>>> return final
>>>
>>> On Wed, Nov 16, 2016 at 10:50 AM, Michael Armbrust <
>>> mich...@databricks.com> wrote:
>>>
 On Wed, Nov 16, 2016 at 2:49 AM, Hyukjin Kwon 
 wrote:

> Maybe it sounds like you are looking for from_json/to_json functions
> after en/decoding properly.
>

 Which are new built-in functions that will be released with Spark 2.1.

>>>
>>>
>>
>


Spark Submit --> Unable to reach cluster manager to request executors

2016-11-17 Thread KhajaAsmath Mohammed
Hello Everyone,

Could you please let me know if there any optimal way to request executors
while submitting job in yarn mode.

My job is running slow by printing out following statements. Is there a way
to speed up process?


16/11/17 14:03:49 WARN spark.SparkContext: Requesting executors is only
supported in coarse-grained mode
16/11/17 14:03:49 WARN spark.ExecutorAllocationManager: Unable to reach the
cluster manager to request 2 total executors!
16/11/17 14:03:50 WARN spark.SparkContext: Requesting executors is only
supported in coarse-grained mode
16/11/17 14:03:50 WARN spark.ExecutorAllocationManager: Unable to reach the
cluster manager to request 3 total executors!
16/11/17 14:03:51 WARN spark.SparkContext: Requesting executors is only
supported in coarse-grained mode
16/11/17 14:03:51 WARN spark.ExecutorAllocationManager: Unable to reach the
cluster manager to request 4 total executors!
16/11/17 14:03:52 WARN spark.SparkContext: Requesting executors is only
supported in coarse-grained mode
16/11/17 14:03:52 WARN spark.ExecutorAllocationManager: Unable to reach the
cluster manager to request 5 total executors!
16/11/17 14:03:53 WARN spark.SparkContext: Requesting executors is only
supported in coarse-grained mode
16/11/17 14:03:53 WARN spark.ExecutorAllocationManager: Unable to reach the
cluster manager to request 6 total executors!
16/11/17 14:03:54 WARN spark.SparkContext: Requesting executors is only
supported in coarse-grained mode
16/11/17 14:03:54 WARN spark.ExecutorAllocationManager: Unable to reach the
cluster manager to request 7 total executors!
16/11/17 14:03:55 WARN spark.SparkContext: Requesting executors is only
supported in coarse-grained mode
16/11/17 14:03:55 WARN spark.ExecutorAllocationManager: Unable to reach the
cluster manager to request 8 total executors!
16/11/17 14:03:56 WARN spark.SparkContext: Requesting executors is only
supported in coarse-grained mode
16/11/17 14:03:56 WARN spark.ExecutorAllocationManager: Unable to reach the
cluster manager to request 9 total executors!
16/11/17 14:03:57 WARN spark.SparkContext: Requesting executors is only
supported in coarse-grained mode
16/11/17 14:03:57 WARN spark.ExecutorAllocationManager: Unable to reach the
cluster manager to request 10 total executors!
16/11/17 14:03:58 WARN spark.SparkContext: Requesting executors is only
supported in coarse-grained mode
16/11/17 14:03:58 WARN spark.ExecutorAllocationManager: Unable to reach the
cluster manager to request 11 total executors!
16/11/17 14:03:59 WARN spark.SparkContext: Requesting executors is only
supported in coarse-grained mode
16/11/17 14:03:59 WARN spark.ExecutorAllocationManager: Unable to reach the
cluster manager to request 12 total executors!
16/11/17 14:04:00 WARN spark.SparkContext: Requesting executors is only
supported in coarse-grained mode

Thanks,
Asmath.


Re: Spark AVRO S3 read not working for partitioned data

2016-11-17 Thread Jon Gregg
Making a guess here: you need to add s3:ListBucket?

http://stackoverflow.com/questions/35803808/spark-saveastextfile-to-s3-fails


On Thu, Nov 17, 2016 at 2:11 PM, Jain, Nishit 
wrote:

> When I read a specific file it works:
>
> val filePath= "s3n://bucket_name/f1/f2/avro/dt=2016-10-19/hr=19/00"
> val df = spark.read.avro(filePath)
>
> But if I point to a folder to read date partitioned data it fails:
>
> val filePath="s3n://bucket_name/f1/f2/avro/dt=2016-10-19/"
>
> I get this error:
>
> Exception in thread "main" org.apache.hadoop.fs.s3.S3Exception: 
> org.jets3t.service.S3ServiceException: S3 HEAD request failed for 
> '/f1%2Ff2%2Favro%2Fdt%3D2016-10-19' - ResponseCode=403, 
> ResponseMessage=Forbidden
> at 
> org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.handleServiceException(Jets3tNativeFileSystemStore.java:245)
> at 
> org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:119)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
> at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> at org.apache.hadoop.fs.s3native.$Proxy7.retrieveMetadata(Unknown Source)
> at 
> org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus(NativeS3FileSystem.java:414)
> at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1397)
> at 
> org.apache.spark.sql.execution.datasources.DataSource$$anonfun$12.apply(DataSource.scala:374)
> at 
> org.apache.spark.sql.execution.datasources.DataSource$$anonfun$12.apply(DataSource.scala:364)
> at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
> at scala.collection.immutable.List.flatMap(List.scala:344)
> at 
> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:364)
> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:149)
> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:132)
> at 
> com.databricks.spark.avro.package$AvroDataFrameReader$$anonfun$avro$2.apply(package.scala:34)
> at 
> com.databricks.spark.avro.package$AvroDataFrameReader$$anonfun$avro$2.apply(package.scala:34)
> at BasicS3Avro$.main(BasicS3Avro.scala:55)
> at BasicS3Avro.main(BasicS3Avro.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
>
>
> Am I missing anything?
>
>
>


Re: does column order matter in dataframe.repartition?

2016-11-17 Thread Sean Owen
It's not in general true that 100 different partitions keys go to 100
partitions -- it depends on the partitioner, but wouldn't be true in the
case of a default HashPartitioner. But, yeah you'd expect a reasonably even
distribution.

What happens in all cases depends on the partitioner. I haven't tested it
(you should just try it) but I would assume that switching the columns
could result in a different assignment to partitions. You would not in
general want to be sensitive to the exact partitioning unless you were
using your own custom partitioning.

On Thu, Nov 17, 2016 at 7:41 PM Cesar  wrote:

>
> I am using the next line to re-partition a data frame by multiple columns:
>
> val partitionColumns = Seq("date", "company_id").map(x => new Column(x))
> val numPartitions = 100
>
> val dfRepartitioined = df.repartition(numPartitions, partitionColumns)
>
> I understand that if the number of combinations of date and company_id is
> at most 100, each combination of will go to a different partition.
>
> My question is, what happens when the number of combinations larger than
> 100 ? Does re-partition changes in behavior if I switch the column order in
> the definition of partitionColumns variable?
>
> Thanks
> --
> Cesar Flores
>


Re: How to load only the data of the last partition

2016-11-17 Thread Daniel Haviv
Hi Samy,
If you're working with hive you could create a partitioned table and update
it's partitions' locations to the last version so when you'll query it
using spark, you'll always get the latest version.

Daniel

On Thu, Nov 17, 2016 at 9:05 PM, Samy Dindane  wrote:

> Hi,
>
> I have some data partitioned this way:
>
> /data/year=2016/month=9/version=0
> /data/year=2016/month=10/version=0
> /data/year=2016/month=10/version=1
> /data/year=2016/month=10/version=2
> /data/year=2016/month=10/version=3
> /data/year=2016/month=11/version=0
> /data/year=2016/month=11/version=1
>
> When using this data, I'd like to load the last version only of each month.
>
> A simple way to do this is to do `load("/data/year=2016/month=11/version=3")`
> instead of doing `load("/data")`.
> The drawback of this solution is the loss of partitioning information such
> as `year` and `month`, which means it would not be possible to apply
> operations based on the year or the month anymore.
>
> Is it possible to ask Spark to load the last version only of each month?
> How would you go about this?
>
> Thank you,
>
> Samy
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark Partitioning Strategy with Parquet

2016-11-17 Thread titli batali
That would help but again in a particular partitions i would need to a
iterate over the customers having first n letters of user id in that
partition. I want to get rid of nested iterations.

Thanks

On Thu, Nov 17, 2016 at 10:28 PM, Xiaomeng Wan  wrote:

> You can partitioned on the first n letters of userid
>
> On 17 November 2016 at 08:25, titli batali  wrote:
>
>> Hi,
>>
>> I have a use case, where we have 1000 csv files with a column user_Id,
>> having 8 million unique users. The data contains: userid,date,transaction,
>> where we run some queries.
>>
>> We have a case where we need to iterate for each transaction in a
>> particular date for each user. There is three nesting loops
>>
>> for(user){
>>   for(date){
>> for(transactions){
>>   //Do Something
>>   }
>>}
>> }
>>
>> i.e we do similar thing for every (date,transaction) tuple for a
>> particular user. In order to get away with loop structure and decrease the
>> processing time We are converting converting the csv files to parquet and
>> partioning it with userid, df.write.format("parquet").par
>> titionBy("useridcol").save("hdfs://path").
>>
>> So that while reading the parquet files, we read a particular user in a
>> particular partition and create a Cartesian product of (date X transaction)
>> and work on the tuple in each partition, to achieve the above level of
>> nesting. Partitioning on 8 million users is it a bad option. What could be
>> a better way to achieve this?
>>
>> Thanks
>>
>>
>>
>
>


does column order matter in dataframe.repartition?

2016-11-17 Thread Cesar
I am using the next line to re-partition a data frame by multiple columns:

val partitionColumns = Seq("date", "company_id").map(x => new Column(x))
val numPartitions = 100

val dfRepartitioined = df.repartition(numPartitions, partitionColumns)

I understand that if the number of combinations of date and company_id is
at most 100, each combination of will go to a different partition.

My question is, what happens when the number of combinations larger than
100 ? Does re-partition changes in behavior if I switch the column order in
the definition of partitionColumns variable?

Thanks
-- 
Cesar Flores


Spark 2.0.2, Structured Streaming with kafka source... Unable to parse the value to Object..

2016-11-17 Thread shyla deshpande
val spark = SparkSession.builder.
  master("local")
  .appName("spark session example")
  .getOrCreate()

import spark.implicits._

val dframe1 = spark.readStream.format("kafka").
  option("kafka.bootstrap.servers","localhost:9092").
  option("subscribe","student").load()

*How do I deserialize the value column from dataframe1 *

*which is Array[Byte] to Student object using Student.parseFrom..???*

*Please help.*

*Thanks.*



// Stream of votes from Kafka as bytesval votesAsBytes =
KafkaUtils.createDirectStream[String, Array[Byte]](
  ssc, LocationStrategies.PreferConsistent,
  ConsumerStrategies.Subscribe[String, Array[Byte]](Array("votes"),
kafkaParams))
// Parse them into Vote case class.val votes: DStream[Vote] = votesAsBytes.map {
  (cr: ConsumerRecord[String, Array[Byte]]) =>
Vote.parseFrom(cr.value())}


Spark AVRO S3 read not working for partitioned data

2016-11-17 Thread Jain, Nishit
When I read a specific file it works:

val filePath= "s3n://bucket_name/f1/f2/avro/dt=2016-10-19/hr=19/00"
val df = spark.read.avro(filePath)


But if I point to a folder to read date partitioned data it fails:

val filePath="s3n://bucket_name/f1/f2/avro/dt=2016-10-19/"

I get this error:

Exception in thread "main" org.apache.hadoop.fs.s3.S3Exception: 
org.jets3t.service.S3ServiceException: S3 HEAD request failed for 
'/f1%2Ff2%2Favro%2Fdt%3D2016-10-19' - ResponseCode=403, 
ResponseMessage=Forbidden
at 
org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.handleServiceException(Jets3tNativeFileSystemStore.java:245)
at 
org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:119)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at org.apache.hadoop.fs.s3native.$Proxy7.retrieveMetadata(Unknown Source)
at 
org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus(NativeS3FileSystem.java:414)
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1397)
at 
org.apache.spark.sql.execution.datasources.DataSource$$anonfun$12.apply(DataSource.scala:374)
at 
org.apache.spark.sql.execution.datasources.DataSource$$anonfun$12.apply(DataSource.scala:364)
at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.immutable.List.flatMap(List.scala:344)
at 
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:364)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:149)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:132)
at 
com.databricks.spark.avro.package$AvroDataFrameReader$$anonfun$avro$2.apply(package.scala:34)
at 
com.databricks.spark.avro.package$AvroDataFrameReader$$anonfun$avro$2.apply(package.scala:34)
at BasicS3Avro$.main(BasicS3Avro.scala:55)
at BasicS3Avro.main(BasicS3Avro.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)


Am I missing anything?




replace some partitions when writing dataframe

2016-11-17 Thread Koert Kuipers
i am looking into writing a dataframe to parquet using partioning. so
something like

df
  .write
  .mode(saveMode)
  .partitionBy(partitionColumn)
  .format("parquet")
  .save(path)

i imagine i will have thousands of partitions. generally my goal is not to
recreate all partitions every time, but just a few partitions. the
partitions i do write to i want to replace all the data in.

i would expect this to be a general and typical use case since a true
append (adding data to partitions) is messy and not idempotent and to be
avoided by design (in fact i am not sure why it exists at all, unless
transactions are supported). redoing all partitions is very inefficient.

what saveMode do i use? in my tests if i use saveMode=Overwrite then i lose
all partitions. if i use saveMode=Append is the dangerous non-idempotent
usage that adds to partitions. i dont think saveMode=Ignore or
saveMode=ErrorIfExists will help me either.


How to load only the data of the last partition

2016-11-17 Thread Samy Dindane

Hi,

I have some data partitioned this way:

/data/year=2016/month=9/version=0
/data/year=2016/month=10/version=0
/data/year=2016/month=10/version=1
/data/year=2016/month=10/version=2
/data/year=2016/month=10/version=3
/data/year=2016/month=11/version=0
/data/year=2016/month=11/version=1

When using this data, I'd like to load the last version only of each month.

A simple way to do this is to do `load("/data/year=2016/month=11/version=3")` instead of 
doing `load("/data")`.
The drawback of this solution is the loss of partitioning information such as 
`year` and `month`, which means it would not be possible to apply operations 
based on the year or the month anymore.

Is it possible to ask Spark to load the last version only of each month? How 
would you go about this?

Thank you,

Samy

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Kafka segmentation

2016-11-17 Thread Hoang Bao Thien
I am sorry I don't understand your idea. What do you mean exactly?

On Fri, Nov 18, 2016 at 1:50 AM, Cody Koeninger  wrote:

> Ok, I don't think I'm clear on the issue then.  Can you say what the
> expected behavior is, and what the observed behavior is?
>
> On Thu, Nov 17, 2016 at 10:48 AM, Hoang Bao Thien 
> wrote:
> > Hi,
> >
> > Thanks for your comments. But in fact, I don't want to limit the size of
> > batches, it could be any greater size as it does.
> >
> > Thien
> >
> > On Fri, Nov 18, 2016 at 1:17 AM, Cody Koeninger 
> wrote:
> >>
> >> If you want a consistent limit on the size of batches, use
> >> spark.streaming.kafka.maxRatePerPartition  (assuming you're using
> >> createDirectStream)
> >>
> >> http://spark.apache.org/docs/latest/configuration.html#spark-streaming
> >>
> >> On Thu, Nov 17, 2016 at 12:52 AM, Hoang Bao Thien <
> hbthien0...@gmail.com>
> >> wrote:
> >> > Hi,
> >> >
> >> > I use CSV and other text files to Kafka just to test Kafka + Spark
> >> > Streaming
> >> > by using direct stream. That's why I don't want Spark streaming reads
> >> > CSVs
> >> > or text files directly.
> >> > In addition, I don't want a giant batch of records like the link you
> >> > sent.
> >> > The problem is that we should receive the "similar" number of record
> of
> >> > all
> >> > batchs instead of the first two or three batches have so large number
> of
> >> > records (e.g., 100K) but the last 1000 batches with only 200 records.
> >> >
> >> > I know that the problem is not from the auto.offset.reset=largest,
> but I
> >> > don't know what I can do in this case.
> >> >
> >> > Do you and other ones could suggest me some solutions please as this
> >> > seems
> >> > the normal situation with Kafka+SpartStreaming.
> >> >
> >> > Thanks.
> >> > Alex
> >> >
> >> >
> >> >
> >> > On Thu, Nov 17, 2016 at 2:32 AM, Cody Koeninger 
> >> > wrote:
> >> >>
> >> >> Yeah, if you're reporting issues, please be clear as to whether
> >> >> backpressure is enabled, and whether maxRatePerPartition is set.
> >> >>
> >> >> I expect that there is something wrong with backpressure, see e.g.
> >> >> https://issues.apache.org/jira/browse/SPARK-18371
> >> >>
> >> >> On Wed, Nov 16, 2016 at 5:05 PM, bo yang 
> wrote:
> >> >> > I hit similar issue with Spark Streaming. The batch size seemed a
> >> >> > little
> >> >> > random. Sometime it was large with many Kafka messages inside same
> >> >> > batch,
> >> >> > sometimes it was very small with just a few messages. Is it
> possible
> >> >> > that
> >> >> > was caused by the backpressure implementation in Spark Streaming?
> >> >> >
> >> >> > On Wed, Nov 16, 2016 at 4:22 PM, Cody Koeninger <
> c...@koeninger.org>
> >> >> > wrote:
> >> >> >>
> >> >> >> Moved to user list.
> >> >> >>
> >> >> >> I'm not really clear on what you're trying to accomplish (why put
> >> >> >> the
> >> >> >> csv file through Kafka instead of reading it directly with spark?)
> >> >> >>
> >> >> >> auto.offset.reset=largest just means that when starting the job
> >> >> >> without any defined offsets, it will start at the highest (most
> >> >> >> recent) available offsets.  That's probably not what you want if
> >> >> >> you've already loaded csv lines into kafka.
> >> >> >>
> >> >> >> On Wed, Nov 16, 2016 at 2:45 PM, Hoang Bao Thien
> >> >> >> 
> >> >> >> wrote:
> >> >> >> > Hi all,
> >> >> >> >
> >> >> >> > I would like to ask a question related to the size of Kafka
> >> >> >> > stream. I
> >> >> >> > want
> >> >> >> > to put data (e.g., file *.csv) to Kafka then use Spark streaming
> >> >> >> > to
> >> >> >> > get
> >> >> >> > the
> >> >> >> > output from Kafka and then save to Hive by using SparkSQL. The
> >> >> >> > file
> >> >> >> > csv
> >> >> >> > is
> >> >> >> > about 100MB with ~250K messages/rows (Each row has about 10
> fields
> >> >> >> > of
> >> >> >> > integer). I see that Spark Streaming first received two
> >> >> >> > partitions/batches,
> >> >> >> > the first is of 60K messages and the second is of 50K msgs. But
> >> >> >> > from
> >> >> >> > the
> >> >> >> > third batch, Spark just received 200 messages for each batch (or
> >> >> >> > partition).
> >> >> >> > I think that this problem is coming from Kafka or some
> >> >> >> > configuration
> >> >> >> > in
> >> >> >> > Spark. I already tried to configure with the setting
> >> >> >> > "auto.offset.reset=largest", but every batch only gets 200
> >> >> >> > messages.
> >> >> >> >
> >> >> >> > Could you please tell me how to fix this problem?
> >> >> >> > Thank you so much.
> >> >> >> >
> >> >> >> > Best regards,
> >> >> >> > Alex
> >> >> >> >
> >> >> >>
> >> >> >>
> >> >> >> 
> -
> >> >> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >> >> >>
> >> >> >
> >> >
> >> >
> >
> >
>


Re: Kafka segmentation

2016-11-17 Thread Cody Koeninger
Ok, I don't think I'm clear on the issue then.  Can you say what the
expected behavior is, and what the observed behavior is?

On Thu, Nov 17, 2016 at 10:48 AM, Hoang Bao Thien  wrote:
> Hi,
>
> Thanks for your comments. But in fact, I don't want to limit the size of
> batches, it could be any greater size as it does.
>
> Thien
>
> On Fri, Nov 18, 2016 at 1:17 AM, Cody Koeninger  wrote:
>>
>> If you want a consistent limit on the size of batches, use
>> spark.streaming.kafka.maxRatePerPartition  (assuming you're using
>> createDirectStream)
>>
>> http://spark.apache.org/docs/latest/configuration.html#spark-streaming
>>
>> On Thu, Nov 17, 2016 at 12:52 AM, Hoang Bao Thien 
>> wrote:
>> > Hi,
>> >
>> > I use CSV and other text files to Kafka just to test Kafka + Spark
>> > Streaming
>> > by using direct stream. That's why I don't want Spark streaming reads
>> > CSVs
>> > or text files directly.
>> > In addition, I don't want a giant batch of records like the link you
>> > sent.
>> > The problem is that we should receive the "similar" number of record of
>> > all
>> > batchs instead of the first two or three batches have so large number of
>> > records (e.g., 100K) but the last 1000 batches with only 200 records.
>> >
>> > I know that the problem is not from the auto.offset.reset=largest, but I
>> > don't know what I can do in this case.
>> >
>> > Do you and other ones could suggest me some solutions please as this
>> > seems
>> > the normal situation with Kafka+SpartStreaming.
>> >
>> > Thanks.
>> > Alex
>> >
>> >
>> >
>> > On Thu, Nov 17, 2016 at 2:32 AM, Cody Koeninger 
>> > wrote:
>> >>
>> >> Yeah, if you're reporting issues, please be clear as to whether
>> >> backpressure is enabled, and whether maxRatePerPartition is set.
>> >>
>> >> I expect that there is something wrong with backpressure, see e.g.
>> >> https://issues.apache.org/jira/browse/SPARK-18371
>> >>
>> >> On Wed, Nov 16, 2016 at 5:05 PM, bo yang  wrote:
>> >> > I hit similar issue with Spark Streaming. The batch size seemed a
>> >> > little
>> >> > random. Sometime it was large with many Kafka messages inside same
>> >> > batch,
>> >> > sometimes it was very small with just a few messages. Is it possible
>> >> > that
>> >> > was caused by the backpressure implementation in Spark Streaming?
>> >> >
>> >> > On Wed, Nov 16, 2016 at 4:22 PM, Cody Koeninger 
>> >> > wrote:
>> >> >>
>> >> >> Moved to user list.
>> >> >>
>> >> >> I'm not really clear on what you're trying to accomplish (why put
>> >> >> the
>> >> >> csv file through Kafka instead of reading it directly with spark?)
>> >> >>
>> >> >> auto.offset.reset=largest just means that when starting the job
>> >> >> without any defined offsets, it will start at the highest (most
>> >> >> recent) available offsets.  That's probably not what you want if
>> >> >> you've already loaded csv lines into kafka.
>> >> >>
>> >> >> On Wed, Nov 16, 2016 at 2:45 PM, Hoang Bao Thien
>> >> >> 
>> >> >> wrote:
>> >> >> > Hi all,
>> >> >> >
>> >> >> > I would like to ask a question related to the size of Kafka
>> >> >> > stream. I
>> >> >> > want
>> >> >> > to put data (e.g., file *.csv) to Kafka then use Spark streaming
>> >> >> > to
>> >> >> > get
>> >> >> > the
>> >> >> > output from Kafka and then save to Hive by using SparkSQL. The
>> >> >> > file
>> >> >> > csv
>> >> >> > is
>> >> >> > about 100MB with ~250K messages/rows (Each row has about 10 fields
>> >> >> > of
>> >> >> > integer). I see that Spark Streaming first received two
>> >> >> > partitions/batches,
>> >> >> > the first is of 60K messages and the second is of 50K msgs. But
>> >> >> > from
>> >> >> > the
>> >> >> > third batch, Spark just received 200 messages for each batch (or
>> >> >> > partition).
>> >> >> > I think that this problem is coming from Kafka or some
>> >> >> > configuration
>> >> >> > in
>> >> >> > Spark. I already tried to configure with the setting
>> >> >> > "auto.offset.reset=largest", but every batch only gets 200
>> >> >> > messages.
>> >> >> >
>> >> >> > Could you please tell me how to fix this problem?
>> >> >> > Thank you so much.
>> >> >> >
>> >> >> > Best regards,
>> >> >> > Alex
>> >> >> >
>> >> >>
>> >> >>
>> >> >> -
>> >> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >> >>
>> >> >
>> >
>> >
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Kafka segmentation

2016-11-17 Thread Hoang Bao Thien
Hi,

Thanks for your comments. But in fact, I don't want to limit the size of
batches, it could be any greater size as it does.

Thien

On Fri, Nov 18, 2016 at 1:17 AM, Cody Koeninger  wrote:

> If you want a consistent limit on the size of batches, use
> spark.streaming.kafka.maxRatePerPartition  (assuming you're using
> createDirectStream)
>
> http://spark.apache.org/docs/latest/configuration.html#spark-streaming
>
> On Thu, Nov 17, 2016 at 12:52 AM, Hoang Bao Thien 
> wrote:
> > Hi,
> >
> > I use CSV and other text files to Kafka just to test Kafka + Spark
> Streaming
> > by using direct stream. That's why I don't want Spark streaming reads
> CSVs
> > or text files directly.
> > In addition, I don't want a giant batch of records like the link you
> sent.
> > The problem is that we should receive the "similar" number of record of
> all
> > batchs instead of the first two or three batches have so large number of
> > records (e.g., 100K) but the last 1000 batches with only 200 records.
> >
> > I know that the problem is not from the auto.offset.reset=largest, but I
> > don't know what I can do in this case.
> >
> > Do you and other ones could suggest me some solutions please as this
> seems
> > the normal situation with Kafka+SpartStreaming.
> >
> > Thanks.
> > Alex
> >
> >
> >
> > On Thu, Nov 17, 2016 at 2:32 AM, Cody Koeninger 
> wrote:
> >>
> >> Yeah, if you're reporting issues, please be clear as to whether
> >> backpressure is enabled, and whether maxRatePerPartition is set.
> >>
> >> I expect that there is something wrong with backpressure, see e.g.
> >> https://issues.apache.org/jira/browse/SPARK-18371
> >>
> >> On Wed, Nov 16, 2016 at 5:05 PM, bo yang  wrote:
> >> > I hit similar issue with Spark Streaming. The batch size seemed a
> little
> >> > random. Sometime it was large with many Kafka messages inside same
> >> > batch,
> >> > sometimes it was very small with just a few messages. Is it possible
> >> > that
> >> > was caused by the backpressure implementation in Spark Streaming?
> >> >
> >> > On Wed, Nov 16, 2016 at 4:22 PM, Cody Koeninger 
> >> > wrote:
> >> >>
> >> >> Moved to user list.
> >> >>
> >> >> I'm not really clear on what you're trying to accomplish (why put the
> >> >> csv file through Kafka instead of reading it directly with spark?)
> >> >>
> >> >> auto.offset.reset=largest just means that when starting the job
> >> >> without any defined offsets, it will start at the highest (most
> >> >> recent) available offsets.  That's probably not what you want if
> >> >> you've already loaded csv lines into kafka.
> >> >>
> >> >> On Wed, Nov 16, 2016 at 2:45 PM, Hoang Bao Thien
> >> >> 
> >> >> wrote:
> >> >> > Hi all,
> >> >> >
> >> >> > I would like to ask a question related to the size of Kafka
> stream. I
> >> >> > want
> >> >> > to put data (e.g., file *.csv) to Kafka then use Spark streaming to
> >> >> > get
> >> >> > the
> >> >> > output from Kafka and then save to Hive by using SparkSQL. The file
> >> >> > csv
> >> >> > is
> >> >> > about 100MB with ~250K messages/rows (Each row has about 10 fields
> of
> >> >> > integer). I see that Spark Streaming first received two
> >> >> > partitions/batches,
> >> >> > the first is of 60K messages and the second is of 50K msgs. But
> from
> >> >> > the
> >> >> > third batch, Spark just received 200 messages for each batch (or
> >> >> > partition).
> >> >> > I think that this problem is coming from Kafka or some
> configuration
> >> >> > in
> >> >> > Spark. I already tried to configure with the setting
> >> >> > "auto.offset.reset=largest", but every batch only gets 200
> messages.
> >> >> >
> >> >> > Could you please tell me how to fix this problem?
> >> >> > Thank you so much.
> >> >> >
> >> >> > Best regards,
> >> >> > Alex
> >> >> >
> >> >>
> >> >> 
> -
> >> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >> >>
> >> >
> >
> >
>


analysing ibm mq messages using spark streaming

2016-11-17 Thread Mich Talebzadeh
hi,

I guess the only way to do this is to read ibm mq messages into flume,
ingest it into hdfs and read it from there. alternatively use flume to
ingest data into hbase and then use spark on hbase.

I don't think there is an api like spark streaming with kafka for ibm mq?

thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Re: Kafka segmentation

2016-11-17 Thread Cody Koeninger
If you want a consistent limit on the size of batches, use
spark.streaming.kafka.maxRatePerPartition  (assuming you're using
createDirectStream)

http://spark.apache.org/docs/latest/configuration.html#spark-streaming

On Thu, Nov 17, 2016 at 12:52 AM, Hoang Bao Thien  wrote:
> Hi,
>
> I use CSV and other text files to Kafka just to test Kafka + Spark Streaming
> by using direct stream. That's why I don't want Spark streaming reads CSVs
> or text files directly.
> In addition, I don't want a giant batch of records like the link you sent.
> The problem is that we should receive the "similar" number of record of all
> batchs instead of the first two or three batches have so large number of
> records (e.g., 100K) but the last 1000 batches with only 200 records.
>
> I know that the problem is not from the auto.offset.reset=largest, but I
> don't know what I can do in this case.
>
> Do you and other ones could suggest me some solutions please as this seems
> the normal situation with Kafka+SpartStreaming.
>
> Thanks.
> Alex
>
>
>
> On Thu, Nov 17, 2016 at 2:32 AM, Cody Koeninger  wrote:
>>
>> Yeah, if you're reporting issues, please be clear as to whether
>> backpressure is enabled, and whether maxRatePerPartition is set.
>>
>> I expect that there is something wrong with backpressure, see e.g.
>> https://issues.apache.org/jira/browse/SPARK-18371
>>
>> On Wed, Nov 16, 2016 at 5:05 PM, bo yang  wrote:
>> > I hit similar issue with Spark Streaming. The batch size seemed a little
>> > random. Sometime it was large with many Kafka messages inside same
>> > batch,
>> > sometimes it was very small with just a few messages. Is it possible
>> > that
>> > was caused by the backpressure implementation in Spark Streaming?
>> >
>> > On Wed, Nov 16, 2016 at 4:22 PM, Cody Koeninger 
>> > wrote:
>> >>
>> >> Moved to user list.
>> >>
>> >> I'm not really clear on what you're trying to accomplish (why put the
>> >> csv file through Kafka instead of reading it directly with spark?)
>> >>
>> >> auto.offset.reset=largest just means that when starting the job
>> >> without any defined offsets, it will start at the highest (most
>> >> recent) available offsets.  That's probably not what you want if
>> >> you've already loaded csv lines into kafka.
>> >>
>> >> On Wed, Nov 16, 2016 at 2:45 PM, Hoang Bao Thien
>> >> 
>> >> wrote:
>> >> > Hi all,
>> >> >
>> >> > I would like to ask a question related to the size of Kafka stream. I
>> >> > want
>> >> > to put data (e.g., file *.csv) to Kafka then use Spark streaming to
>> >> > get
>> >> > the
>> >> > output from Kafka and then save to Hive by using SparkSQL. The file
>> >> > csv
>> >> > is
>> >> > about 100MB with ~250K messages/rows (Each row has about 10 fields of
>> >> > integer). I see that Spark Streaming first received two
>> >> > partitions/batches,
>> >> > the first is of 60K messages and the second is of 50K msgs. But from
>> >> > the
>> >> > third batch, Spark just received 200 messages for each batch (or
>> >> > partition).
>> >> > I think that this problem is coming from Kafka or some configuration
>> >> > in
>> >> > Spark. I already tried to configure with the setting
>> >> > "auto.offset.reset=largest", but every batch only gets 200 messages.
>> >> >
>> >> > Could you please tell me how to fix this problem?
>> >> > Thank you so much.
>> >> >
>> >> > Best regards,
>> >> > Alex
>> >> >
>> >>
>> >> -
>> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >>
>> >
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Fill nan with last (good) value

2016-11-17 Thread geoHeil
How can I fill nan values with the last (good) value?

For me, it would be enough to fill it with the previous value of a window
function. So far I could it not get to work as my window function only
returns nan values.
Here is code for a minimal example:
http://stackoverflow.com/questions/40592207/spark-scala-fill-nan-with-last-good-observation



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Fill-nan-with-last-good-value-tp28099.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Using mapWithState without a checkpoint

2016-11-17 Thread Daniel Haviv
Hi,
Is it possible to use mapWithState without checkpointing at all ?
I'd rather have the whole application fail, restart and reload an
initialState RDD than pay for checkpointing every 10 batches.

Thank you,
Daniel


Re: Spark Streaming Data loss on failure to write BlockAdditionEvent failure to WAL

2016-11-17 Thread Dirceu Semighini Filho
Nice, thank you I'll test this property to see if the error stop;


2016-11-17 14:48 GMT-02:00 Arijit :

> Hi Dirceu,
>
>
> For the append issue we are setting "hdfs.append.support" (from Spark code
> which reads HDFS configuration) to "true" in hdfs-site.xml and that seemed
> to have solved the issue. Of course we are using HDFS which does support
> append. I think the actual configuration Spark should check is
> "dfs.support.append".
>
>
> I believe failure is intermittent since in most cases a new file is
> created to store the block addition event. I need to look into the code
> again to see when these files are created new and when they are appended.
>
>
> Thanks, Arijit
>
>
> --
> *From:* Dirceu Semighini Filho 
> *Sent:* Thursday, November 17, 2016 6:50:28 AM
> *To:* Arijit
> *Cc:* Tathagata Das; user@spark.apache.org
>
> *Subject:* Re: Spark Streaming Data loss on failure to write
> BlockAdditionEvent failure to WAL
>
> Hi Arijit,
> Have you find a solution for this? I'm facing the same problem in Spark
> 1.6.1, but here the error happens only a few times, so our hdfs does
> support append.
> This is what I can see in the logs:
> 2016-11-17 13:43:20,012 ERROR [BatchedWriteAheadLog Writer]
> WriteAheadLogManager  for Thread: Failed to write to write ahead log after
> 3 failures
>
>
>
>
> 2016-11-08 14:47 GMT-02:00 Arijit :
>
>> Thanks TD.
>>
>>
>> Is "hdfs.append.support" a standard configuration? I see a seemingly
>> equivalent configuration "dfs.support.append" that is used in our
>> version of HDFS.
>>
>>
>> In case we want to use a pseudo file-system (like S3)  which does not
>> support append what are our options? I am not familiar with the code yet
>> but is it possible to generate a new file whenever conflict of this sort
>> happens?
>>
>>
>> Thanks again, Arijit
>> --
>> *From:* Tathagata Das 
>> *Sent:* Monday, November 7, 2016 7:59:06 PM
>> *To:* Arijit
>> *Cc:* user@spark.apache.org
>> *Subject:* Re: Spark Streaming Data loss on failure to write
>> BlockAdditionEvent failure to WAL
>>
>> For WAL in Spark to work with HDFS, the HDFS version you are running must
>> support file appends. Contact your HDFS package/installation provider to
>> figure out whether this is supported by your HDFS installation.
>>
>> On Mon, Nov 7, 2016 at 2:04 PM, Arijit  wrote:
>>
>>> Hello All,
>>>
>>>
>>> We are using Spark 1.6.2 with WAL enabled and encountering data loss
>>> when the following exception/warning happens. We are using HDFS as our
>>> checkpoint directory.
>>>
>>>
>>> Questions are:
>>>
>>>
>>> 1. Is this a bug in Spark or issue with our configuration? Source looks
>>> like the following. Which file already exist or who is suppose to set
>>> hdfs.append.support configuration? Why doesn't it happen all the time?
>>>
>>>
>>> private[streaming] object HdfsUtils {
>>>
>>>   def getOutputStream(path: String, conf: Configuration): 
>>> FSDataOutputStream = {
>>> val dfsPath = new Path(path)
>>> val dfs = getFileSystemForPath(dfsPath, conf)
>>> // If the file exists and we have append support, append instead of 
>>> creating a new file
>>> val stream: FSDataOutputStream = {
>>>   if (dfs.isFile(dfsPath)) {
>>> if (conf.getBoolean("hdfs.append.support", false) || 
>>> dfs.isInstanceOf[RawLocalFileSystem]) {
>>>   dfs.append(dfsPath)
>>> } else {
>>>   throw new IllegalStateException("File exists and there is no 
>>> append support!")
>>> }
>>>   } else {
>>> dfs.create(dfsPath)
>>>   }
>>> }
>>> stream
>>>   }
>>>
>>>
>>> 2. Why does the job not retry and eventually fail when this error
>>> occurs? The job skips processing the exact number of events dumped in the
>>> log. For this particular example I see 987 + 4686 events were not processed
>>> and are lost for ever (does not recover even on restart).
>>>
>>>
>>> 16/11/07 21:23:39 ERROR WriteAheadLogManager  for Thread: Failed to
>>> write to write ahead log after 3 failures
>>> 16/11/07 21:23:39 WARN BatchedWriteAheadLog: BatchedWriteAheadLog Writer
>>> failed to write ArrayBuffer(Record(java.nio.HeapByteBuffer[pos=1212
>>> lim=1212 cap=1212],1478553818985,scala.concurrent.impl.Promise$Defaul
>>> tPromise@5ce88cb6), Record(
>>> java.nio.HeapByteBuffer[pos=1212 lim=1212 cap=1212],1478553818985,scala.
>>> concurrent.impl.Promise$DefaultPromise@6d8f1feb))
>>> java.lang.IllegalStateException: File exists and there is no append
>>> support!
>>> at org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(H
>>> dfsUtils.scala:35)
>>> at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter
>>> .org$apache$spark$streaming$util$FileBasedWriteAheadLogWrite
>>> r$$stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33)
>>> at 

Re: Grouping Set

2016-11-17 Thread Andrés Ivaldi
I'm realize that my data have null values, so that null are for the values
not for the calculated grouping set, but that is another problem, how can I
detect witch is one? now I have this problem

I my data is just a row like this [ {1:"A",2:null, 3:123}] the grouping set
(1) will give me
A, null, 123
A, null, 123

and with  [ {1:"A", 2:null, 3:123},{1:"A", 2:"b", 3:1}] the grouping set
(1) will give me
A, null, 124
A, null, 123
A, b, 1

Quick fix could be isNull with a label that I can detect, but that's too
dirty I think, grouping set should return a value type witch could be
detected as the grouped set on that column, not null


On Mon, Nov 14, 2016 at 5:49 PM, ayan guha  wrote:

> And, run the same SQL in hive and post any difference.
> On 15 Nov 2016 07:48, "ayan guha"  wrote:
>
>> It should be A,yes. Can you please reproduce this with small data and
>> exact SQL?
>> On 15 Nov 2016 02:21, "Andrés Ivaldi"  wrote:
>>
>>> Hello, I'm tryin to use Grouping Set, but I dont know if it is a bug or
>>> the correct behavior.
>>>
>>> Givven the above example
>>> Select a,b,sum(c) from table group by a,b grouping set ( (a), (a,b) )
>>>
>>> What shound be the expected result
>>> A:
>>>
>>> A  | B| sum(c)
>>> xx | null | 
>>> xx | yy   | 
>>> xx | zz   | 
>>>
>>>
>>> B
>>> A   | B| sum(c)
>>> xx  | null | 
>>> xx  | yy   | 
>>> xx  | zz   | 
>>> null| yy   | 
>>> null| zz   | 
>>> null| null | 
>>>
>>>
>>> I believe is A, but i'm getting B
>>> thanks
>>>
>>> --
>>> Ing. Ivaldi Andres
>>>
>>


-- 
Ing. Ivaldi Andres


Re: Spark Partitioning Strategy with Parquet

2016-11-17 Thread Xiaomeng Wan
You can partitioned on the first n letters of userid

On 17 November 2016 at 08:25, titli batali  wrote:

> Hi,
>
> I have a use case, where we have 1000 csv files with a column user_Id,
> having 8 million unique users. The data contains: userid,date,transaction,
> where we run some queries.
>
> We have a case where we need to iterate for each transaction in a
> particular date for each user. There is three nesting loops
>
> for(user){
>   for(date){
> for(transactions){
>   //Do Something
>   }
>}
> }
>
> i.e we do similar thing for every (date,transaction) tuple for a
> particular user. In order to get away with loop structure and decrease the
> processing time We are converting converting the csv files to parquet and
> partioning it with userid, df.write.format("parquet").
> partitionBy("useridcol").save("hdfs://path").
>
> So that while reading the parquet files, we read a particular user in a
> particular partition and create a Cartesian product of (date X transaction)
> and work on the tuple in each partition, to achieve the above level of
> nesting. Partitioning on 8 million users is it a bad option. What could be
> a better way to achieve this?
>
> Thanks
>
>
>


Re: SparkILoop doesn't run

2016-11-17 Thread Holden Karau
Moving to user list

So this might be a better question for the user list - but is there a
reason you are trying to use the SparkILoop for tests?

On Thu, Nov 17, 2016 at 5:47 PM Mohit Jaggi  wrote:

>
>
> I am trying to use SparkILoop to write some tests(shown below) but the
> test hangs with the following stack trace. Any idea what is going on?
>
>
> import org.apache.log4j.{Level, LogManager}
> import org.apache.spark.repl.SparkILoop
> import org.scalatest.{BeforeAndAfterAll, FunSuite}
>
> class SparkReplSpec extends FunSuite with BeforeAndAfterAll {
>
>   override def beforeAll(): Unit = {
>   }
>
>   override def afterAll(): Unit = {
>   }
>
>   test("yay!") {
> val rootLogger = LogManager.getRootLogger
> val logLevel = rootLogger.getLevel
> rootLogger.setLevel(Level.ERROR)
>
> val output = SparkILoop.run(
>   """
> |println("hello")
>   """.stripMargin)
>
> println(s" $output ")
>
>   }
> }
>
>
> /Library/Java/JavaVirtualMachines/jdk1.8.0_66.jdk/Contents/Home/bin/java
> -Dspark.master=local[*] -Didea.launcher.port=7532
> "-Didea.launcher.bin.path=/Applications/IntelliJ IDEA CE.app/Contents/bin"
> -Dfile.encoding=UTF-8 -classpath "/Users/mohit/Library/Application
> 

Re: Spark Streaming Data loss on failure to write BlockAdditionEvent failure to WAL

2016-11-17 Thread Arijit
Hi Dirceu,


For the append issue we are setting "hdfs.append.support" (from Spark code 
which reads HDFS configuration) to "true" in hdfs-site.xml and that seemed to 
have solved the issue. Of course we are using HDFS which does support append. I 
think the actual configuration Spark should check is "dfs.support.append".


I believe failure is intermittent since in most cases a new file is created to 
store the block addition event. I need to look into the code again to see when 
these files are created new and when they are appended.


Thanks, Arijit



From: Dirceu Semighini Filho 
Sent: Thursday, November 17, 2016 6:50:28 AM
To: Arijit
Cc: Tathagata Das; user@spark.apache.org
Subject: Re: Spark Streaming Data loss on failure to write BlockAdditionEvent 
failure to WAL

Hi Arijit,
Have you find a solution for this? I'm facing the same problem in Spark 1.6.1, 
but here the error happens only a few times, so our hdfs does support append.
This is what I can see in the logs:
2016-11-17 13:43:20,012 ERROR [BatchedWriteAheadLog Writer] 
WriteAheadLogManager  for Thread: Failed to write to write ahead log after 3 
failures




2016-11-08 14:47 GMT-02:00 Arijit >:

Thanks TD.


Is "hdfs.append.support" a standard configuration? I see a seemingly equivalent 
configuration "dfs.support.append" that is used in our version of HDFS.


In case we want to use a pseudo file-system (like S3)  which does not support 
append what are our options? I am not familiar with the code yet but is it 
possible to generate a new file whenever conflict of this sort happens?


Thanks again, Arijit


From: Tathagata Das 
>
Sent: Monday, November 7, 2016 7:59:06 PM
To: Arijit
Cc: user@spark.apache.org
Subject: Re: Spark Streaming Data loss on failure to write BlockAdditionEvent 
failure to WAL

For WAL in Spark to work with HDFS, the HDFS version you are running must 
support file appends. Contact your HDFS package/installation provider to figure 
out whether this is supported by your HDFS installation.

On Mon, Nov 7, 2016 at 2:04 PM, Arijit 
> wrote:

Hello All,


We are using Spark 1.6.2 with WAL enabled and encountering data loss when the 
following exception/warning happens. We are using HDFS as our checkpoint 
directory.


Questions are:


1. Is this a bug in Spark or issue with our configuration? Source looks like 
the following. Which file already exist or who is suppose to set 
hdfs.append.support configuration? Why doesn't it happen all the time?


private[streaming] object HdfsUtils {

  def getOutputStream(path: String, conf: Configuration): FSDataOutputStream = {
val dfsPath = new Path(path)
val dfs = getFileSystemForPath(dfsPath, conf)
// If the file exists and we have append support, append instead of 
creating a new file
val stream: FSDataOutputStream = {
  if (dfs.isFile(dfsPath)) {
if (conf.getBoolean("hdfs.append.support", false) || 
dfs.isInstanceOf[RawLocalFileSystem]) {
  dfs.append(dfsPath)
} else {
  throw new IllegalStateException("File exists and there is no append 
support!")
}
  } else {
dfs.create(dfsPath)
  }
}
stream
  }


2. Why does the job not retry and eventually fail when this error occurs? The 
job skips processing the exact number of events dumped in the log. For this 
particular example I see 987 + 4686 events were not processed and are lost for 
ever (does not recover even on restart).



16/11/07 21:23:39 ERROR WriteAheadLogManager  for Thread: Failed to write to 
write ahead log after 3 failures
16/11/07 21:23:39 WARN BatchedWriteAheadLog: BatchedWriteAheadLog Writer failed 
to write ArrayBuffer(Record(java.nio.HeapByteBuffer[pos=1212 lim=1212 
cap=1212],1478553818985,scala.concurrent.impl.Promise$DefaultPromise@5ce88cb6), 
Record(
java.nio.HeapByteBuffer[pos=1212 lim=1212 
cap=1212],1478553818985,scala.concurrent.impl.Promise$DefaultPromise@6d8f1feb))
java.lang.IllegalStateException: File exists and there is no append support!
at 
org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(HdfsUtils.scala:35)
at 
org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33)
at 
org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.org$apache$spark$streaming$util$FileBasedWriteAheadLogWriter$$stream(FileBasedWriteAheadLogWriter.scala:33)
at 
org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter.(FileBasedWriteAheadLogWriter.scala:41)

Fill na with last value

2016-11-17 Thread Georg Heiler
How can I fill nan values in spark with the last or the last good known
value?

Here is a minimal example http://stackoverflow.com/q/40592207/2587904
So far I tried a window function but unfortunately received only nan
values.

Kind regards
Georg


RE: Spark SQL join and subquery

2016-11-17 Thread Sood, Anjali
unsubscribe

-Original Message-
From: neil90 [mailto:neilp1...@icloud.com] 
Sent: Thursday, November 17, 2016 8:26 AM
To: user@spark.apache.org
Subject: Re: Spark SQL join and subquery

What version of Spark are you using? I believe this was fixed in 2.0



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-join-and-subquery-tp28093p28097.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark SQL join and subquery

2016-11-17 Thread neil90
What version of Spark are you using? I believe this was fixed in 2.0



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-join-and-subquery-tp28093p28097.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Fwd: Spark Partitioning Strategy with Parquet

2016-11-17 Thread titli batali
Hi,

I have a use case, where we have 1000 csv files with a column user_Id,
having 8 million unique users. The data contains: userid,date,transaction,
where we run some queries.

We have a case where we need to iterate for each transaction in a
particular date for each user. There is three nesting loops

for(user){
  for(date){
for(transactions){
  //Do Something
  }
   }
}

i.e we do similar thing for every (date,transaction) tuple for a particular
user. In order to get away with loop structure and decrease the processing
time We are converting converting the csv files to parquet and partioning
it with userid,
df.write.format("parquet").partitionBy("useridcol").save("hdfs://path").

So that while reading the parquet files, we read a particular user in a
particular partition and create a Cartesian product of (date X transaction)
and work on the tuple in each partition, to achieve the above level of
nesting. Partitioning on 8 million users is it a bad option. What could be
a better way to achieve this?

Thanks


Re: Best practice for preprocessing feature with DataFrame

2016-11-17 Thread Stuart White
Sorry.  Small typo.  That last part should be:

val modifiedRows = rows
  .select(
substring('age, 0, 2) as "age",
when('gender === 1, "male").otherwise(when('gender === 2,
"female").otherwise("unknown")) as "gender"
  )
modifiedRows.show

+---+---+
|age| gender|
+---+---+
| 90|   male|
| 80| female|
| 80|unknown|
+---+---+

On Thu, Nov 17, 2016 at 8:57 AM, Stuart White  wrote:
> import org.apache.spark.sql.functions._
>
> val rows = Seq(("90s", 1), ("80s", 2), ("80s", 3)).toDF("age", "gender")
> rows.show
>
> +---+--+
> |age|gender|
> +---+--+
> |90s| 1|
> |80s| 2|
> |80s| 3|
> +---+--+
>
> val modifiedRows
>   .select(
> substring('age, 0, 2) as "age",
> when('gender === 1, "male").otherwise(when('gender === 2,
> "female").otherwise("unknown")) as "gender"
>   )
> modifiedRows.show
>
> +---+---+
> |age| gender|
> +---+---+
> | 90|   male|
> | 80| female|
> | 80|unknown|
> +---+---+
>
> On Thu, Nov 17, 2016 at 3:37 AM, 颜发才(Yan Facai)  wrote:
>> Could you give me an example, how to use Column function?
>> Thanks very much.
>>
>> On Thu, Nov 17, 2016 at 12:23 PM, Divya Gehlot 
>> wrote:
>>>
>>> Hi,
>>>
>>> You can use the Column functions provided by Spark API
>>>
>>>
>>> https://spark.apache.org/docs/1.6.2/api/java/org/apache/spark/sql/functions.html
>>>
>>> Hope this helps .
>>>
>>> Thanks,
>>> Divya
>>>
>>>
>>> On 17 November 2016 at 12:08, 颜发才(Yan Facai)  wrote:

 Hi,
 I have a sample, like:
 +---+--++
 |age|gender| city_id|
 +---+--++
 |   | 1|1042015:city_2044...|
 |90s| 2|1042015:city_2035...|
 |80s| 2|1042015:city_2061...|
 +---+--++

 and expectation is:
 "age":  90s -> 90, 80s -> 80
 "gender": 1 -> "male", 2 -> "female"

 I have two solutions:
 1. Handle each column separately,  and then join all by index.
 val age = input.select("age").map(...)
 val gender = input.select("gender").map(...)
 val result = ...

 2. Write utf function for each column, and then use in together:
  val result = input.select(ageUDF($"age"), genderUDF($"gender"))

 However, both are awkward,

 Does anyone have a better work flow?
 Write some custom Transforms and use pipeline?

 Thanks.



>>>
>>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



newAPIHadoopFile throws a JsonMappingException: Infinite recursion (StackOverflowError) error

2016-11-17 Thread David Robison
I am trying to create a new JavaPairRDD from data in an HDFS file. My code is:

sparkContext = new JavaSparkContext("yarn-client", "SumFramesPerTimeUnit", 
sparkConf);
JavaPairRDD inputRDD = 
sparkContext.newAPIHadoopFile(fileFilter, FixedLengthInputFormat.class, 
LongWritable.class, BytesWritable.class, config);

However, when I run the job I get the following error:

com.fasterxml.jackson.databind.JsonMappingException: Infinite recursion 
(StackOverflowError) (through reference chain: 
scala.collection.convert.IterableWrapper[0]->org.apache.spark.rdd.RDDOperationScope["allScopes"]->scala.collection.convert.IterableWrapper[0]->org.apache.spark.rdd.RDDOperationScope["allScopes"]->...)
  at 
com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:680)
  at 
com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:156)
  at 
com.fasterxml.jackson.databind.ser.std.CollectionSerializer.serializeContents(CollectionSerializer.java:132)
  at 
com.fasterxml.jackson.module.scala.ser.IterableSerializer.serializeContents(IterableSerializerModule.scala:30)
  at 
com.fasterxml.jackson.module.scala.ser.IterableSerializer.serializeContents(IterableSerializerModule.scala:16)
  at 
com.fasterxml.jackson.databind.ser.std.AsArraySerializerBase.serialize(AsArraySerializerBase.java:185)
  at 
com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:575)
  at 
com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:666)
  at 
com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:156)

Any thoughts as to what may be going wrong?
David

David R Robison
Senior Systems Engineer
O. +1 512 247 3700
M. +1 757 286 0022
david.robi...@psgglobal.net
www.psgglobal.net

Prometheus Security Group Global, Inc.
3019 Alvin Devane Boulevard
Building 4, Suite 450
Austin, TX 78741




Re: Best practice for preprocessing feature with DataFrame

2016-11-17 Thread Stuart White
import org.apache.spark.sql.functions._

val rows = Seq(("90s", 1), ("80s", 2), ("80s", 3)).toDF("age", "gender")
rows.show

+---+--+
|age|gender|
+---+--+
|90s| 1|
|80s| 2|
|80s| 3|
+---+--+

val modifiedRows
  .select(
substring('age, 0, 2) as "age",
when('gender === 1, "male").otherwise(when('gender === 2,
"female").otherwise("unknown")) as "gender"
  )
modifiedRows.show

+---+---+
|age| gender|
+---+---+
| 90|   male|
| 80| female|
| 80|unknown|
+---+---+

On Thu, Nov 17, 2016 at 3:37 AM, 颜发才(Yan Facai)  wrote:
> Could you give me an example, how to use Column function?
> Thanks very much.
>
> On Thu, Nov 17, 2016 at 12:23 PM, Divya Gehlot 
> wrote:
>>
>> Hi,
>>
>> You can use the Column functions provided by Spark API
>>
>>
>> https://spark.apache.org/docs/1.6.2/api/java/org/apache/spark/sql/functions.html
>>
>> Hope this helps .
>>
>> Thanks,
>> Divya
>>
>>
>> On 17 November 2016 at 12:08, 颜发才(Yan Facai)  wrote:
>>>
>>> Hi,
>>> I have a sample, like:
>>> +---+--++
>>> |age|gender| city_id|
>>> +---+--++
>>> |   | 1|1042015:city_2044...|
>>> |90s| 2|1042015:city_2035...|
>>> |80s| 2|1042015:city_2061...|
>>> +---+--++
>>>
>>> and expectation is:
>>> "age":  90s -> 90, 80s -> 80
>>> "gender": 1 -> "male", 2 -> "female"
>>>
>>> I have two solutions:
>>> 1. Handle each column separately,  and then join all by index.
>>> val age = input.select("age").map(...)
>>> val gender = input.select("gender").map(...)
>>> val result = ...
>>>
>>> 2. Write utf function for each column, and then use in together:
>>>  val result = input.select(ageUDF($"age"), genderUDF($"gender"))
>>>
>>> However, both are awkward,
>>>
>>> Does anyone have a better work flow?
>>> Write some custom Transforms and use pipeline?
>>>
>>> Thanks.
>>>
>>>
>>>
>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Handling windows characters with Spark CSV on Linux

2016-11-17 Thread Hyukjin Kwon
Actually, CSV datasource supports encoding option[1] (although it does not
support non-ascii compatible encoding types).

[1]
https://github.com/apache/spark/blob/44c8bfda793b7655e2bd1da5e9915a09ed9d42ce/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala#L364

On 17 Nov 2016 10:59 p.m., "ayan guha"  wrote:

> There is an utility called dos2unix. You can give it a try
>
> On 18 Nov 2016 00:20, "Jörn Franke"  wrote:
> >
> > You can do the conversion of character set (is this the issue?) as part
> of your loading process in Spark.
> > As far as i know the spark CSV package is based on Hadoop
> TextFileInputformat. This format to my best of knowledge supports only
> utf-8. So you have to do a conversion from windows to utf-8. If you refer
> to language specific settings (numbers, dates etc) - this is also not
> supported.
> >
> > I started to work on the hadoopoffice library (which you can use with
> Spark) where you can read Excel files directly (
> https://github.com/ZuInnoTe/hadoopoffice).However, there is no official
> release - yet. There you can specify also the language in which you want to
> represent data values, numbers etc. when reading the file.
> >
> > On 17 Nov 2016, at 14:11, Mich Talebzadeh 
> wrote:
> >
> >> Hi,
> >>
> >> In the past with Databricks package for csv files on occasions I had to
> do some cleaning at Linux directory level before ingesting CSV file into
> HDFS staging directory for Spark to read it.
> >>
> >> I have a more generic issue that may have to be ready.
> >>
> >> Assume that a provides using FTP to push CSV files into Windows
> directories. The whole solution is built around windows and .NET.
> >>
> >> Now you want to ingest those files into HDFS and process them with
> Spark CSV.
> >>
> >> One can create NFS directories visible to Windows server and HDFS
> as well. However, there may be issues with character sets etc. What are the
> best ways of handling this? One way would be to use some scripts to make
> these spreadsheet time files compatible with Linux and then load them into
> HDFS. For example I know that if I saved a Excel spresheet file with DOS
> FORMAT, that file will work OK with Spark CSV.  Are there tools to do this
> as well?
> >>
> >> Thanks
> >>
> >>
> >> Dr Mich Talebzadeh
> >>
> >>
> >>
> >> LinkedIn  https://www.linkedin.com/profile/view?id=
> AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> >>
> >>
> >>
> >> http://talebzadehmich.wordpress.com
> >>
> >>
> >> Disclaimer: Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
> >>
> >>
>


ClassCastException when using SparkSQL Window function

2016-11-17 Thread Isabelle Phan
Hello,

I have a simple session table, which tracks pages users visited with a
sessionId. I would like to apply a window function by sessionId, but am
hitting a type cast exception. I am using Spark 1.5.0.

Here is sample code:
scala> df.printSchema
root
 |-- sessionid: string (nullable = true)
 |-- ip: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- page: string (nullable = true)

scala> df.withColumn("num",
rowNumber.over(Window.partitionBy("sessionid"))).show(10)

Here is the error stacktrace:
Caused by: java.lang.ClassCastException:
org.apache.spark.unsafe.types.UTF8String cannot be cast to java.lang.Integer
at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106)
at
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getInt(rows.scala:40)
at
org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getInt(rows.scala:220)
at
org.apache.spark.sql.catalyst.expressions.JoinedRow.getInt(JoinedRow.scala:82)
at
org.apache.spark.sql.catalyst.expressions.BoundReference.eval(BoundAttribute.scala:45)
at
org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:121)
at
org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:82)
at
org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:61)
at
org.apache.spark.sql.execution.Window$$anonfun$8$$anon$1.next(Window.scala:330)
at
org.apache.spark.sql.execution.Window$$anonfun$8$$anon$1.next(Window.scala:252)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:215)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:215)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1850)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1850)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Has anyone encountered this problem before? Any pointers would be greatly
appreciated.


Thanks!

Isabelle


Re: Handling windows characters with Spark CSV on Linux

2016-11-17 Thread Mich Talebzadeh
Thanks Ayan.

That only works for extra characters like ^ characters etc. Unfortunately
it does not cure specific character sets.

cheers

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 17 November 2016 at 13:59, ayan guha  wrote:

> There is an utility called dos2unix. You can give it a try
>
> On 18 Nov 2016 00:20, "Jörn Franke"  wrote:
> >
> > You can do the conversion of character set (is this the issue?) as part
> of your loading process in Spark.
> > As far as i know the spark CSV package is based on Hadoop
> TextFileInputformat. This format to my best of knowledge supports only
> utf-8. So you have to do a conversion from windows to utf-8. If you refer
> to language specific settings (numbers, dates etc) - this is also not
> supported.
> >
> > I started to work on the hadoopoffice library (which you can use with
> Spark) where you can read Excel files directly (
> https://github.com/ZuInnoTe/hadoopoffice).However, there is no official
> release - yet. There you can specify also the language in which you want to
> represent data values, numbers etc. when reading the file.
> >
> > On 17 Nov 2016, at 14:11, Mich Talebzadeh 
> wrote:
> >
> >> Hi,
> >>
> >> In the past with Databricks package for csv files on occasions I had to
> do some cleaning at Linux directory level before ingesting CSV file into
> HDFS staging directory for Spark to read it.
> >>
> >> I have a more generic issue that may have to be ready.
> >>
> >> Assume that a provides using FTP to push CSV files into Windows
> directories. The whole solution is built around windows and .NET.
> >>
> >> Now you want to ingest those files into HDFS and process them with
> Spark CSV.
> >>
> >> One can create NFS directories visible to Windows server and HDFS
> as well. However, there may be issues with character sets etc. What are the
> best ways of handling this? One way would be to use some scripts to make
> these spreadsheet time files compatible with Linux and then load them into
> HDFS. For example I know that if I saved a Excel spresheet file with DOS
> FORMAT, that file will work OK with Spark CSV.  Are there tools to do this
> as well?
> >>
> >> Thanks
> >>
> >>
> >> Dr Mich Talebzadeh
> >>
> >>
> >>
> >> LinkedIn  https://www.linkedin.com/profile/view?id=
> AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> >>
> >>
> >>
> >> http://talebzadehmich.wordpress.com
> >>
> >>
> >> Disclaimer: Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
> >>
> >>
>


outlier detection using StreamingKMeans

2016-11-17 Thread Debasish Ghosh
Hello -

I am trying to implement an outlier detection application on streaming
data. I am a newbie to Spark and hence would like some advice on the
confusions that I have ..

I am thinking of using StreamingKMeans - is this a good choice ? I have one
stream of data and I need an online algorithm. But here are some questions
that immediately come to my mind ..

   1. I cannot do separate training, cross validation etc. Is this a good
   idea to do training and prediction online ?
   2. The data will be read from the stream coming from Kafka in
   microbatches of (say) 3 seconds. I get a DStream on which I train and
   get the clusters. How can I decide on the number of clusters ? Using
   StreamingKMeans is there any way I can iterate on microbatches with
   different values of k to find the optimal one ?
   3. Even if I fix k, after training on every microbatch I get a DStream.
   How can I compute things like clustering score on the DStream ?
   StreamingKMeansModel has a computeCost function but it takes an RDD. May
   be using DStream.foreachRDD { //.. can work, but I am not able to figure
   out how. How can we compute the cost of clustering for an unbounded list of
   data ? Any idiomatic way to handle this ?

Or is StreamingKMeans is not the right choice to do anomaly detection in an
online setting ..

any suggestion will be welcome ..

regards.

-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg


Re: Handling windows characters with Spark CSV on Linux

2016-11-17 Thread ayan guha
There is an utility called dos2unix. You can give it a try

On 18 Nov 2016 00:20, "Jörn Franke"  wrote:
>
> You can do the conversion of character set (is this the issue?) as part
of your loading process in Spark.
> As far as i know the spark CSV package is based on Hadoop
TextFileInputformat. This format to my best of knowledge supports only
utf-8. So you have to do a conversion from windows to utf-8. If you refer
to language specific settings (numbers, dates etc) - this is also not
supported.
>
> I started to work on the hadoopoffice library (which you can use with
Spark) where you can read Excel files directly (
https://github.com/ZuInnoTe/hadoopoffice).However, there is no official
release - yet. There you can specify also the language in which you want to
represent data values, numbers etc. when reading the file.
>
> On 17 Nov 2016, at 14:11, Mich Talebzadeh 
wrote:
>
>> Hi,
>>
>> In the past with Databricks package for csv files on occasions I had to
do some cleaning at Linux directory level before ingesting CSV file into
HDFS staging directory for Spark to read it.
>>
>> I have a more generic issue that may have to be ready.
>>
>> Assume that a provides using FTP to push CSV files into Windows
directories. The whole solution is built around windows and .NET.
>>
>> Now you want to ingest those files into HDFS and process them with Spark
CSV.
>>
>> One can create NFS directories visible to Windows server and HDFS
as well. However, there may be issues with character sets etc. What are the
best ways of handling this? One way would be to use some scripts to make
these spreadsheet time files compatible with Linux and then load them into
HDFS. For example I know that if I saved a Excel spresheet file with DOS
FORMAT, that file will work OK with Spark CSV.  Are there tools to do this
as well?
>>
>> Thanks
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> Disclaimer: Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.
>>
>>


Re: Spark Streaming Data loss on failure to write BlockAdditionEvent failure to WAL

2016-11-17 Thread Dirceu Semighini Filho
Hi Arijit,
Have you find a solution for this? I'm facing the same problem in Spark
1.6.1, but here the error happens only a few times, so our hdfs does
support append.
This is what I can see in the logs:
2016-11-17 13:43:20,012 ERROR [BatchedWriteAheadLog Writer]
WriteAheadLogManager  for Thread: Failed to write to write ahead log after
3 failures




2016-11-08 14:47 GMT-02:00 Arijit :

> Thanks TD.
>
>
> Is "hdfs.append.support" a standard configuration? I see a seemingly
> equivalent configuration "dfs.support.append" that is used in our version
> of HDFS.
>
>
> In case we want to use a pseudo file-system (like S3)  which does not
> support append what are our options? I am not familiar with the code yet
> but is it possible to generate a new file whenever conflict of this sort
> happens?
>
>
> Thanks again, Arijit
> --
> *From:* Tathagata Das 
> *Sent:* Monday, November 7, 2016 7:59:06 PM
> *To:* Arijit
> *Cc:* user@spark.apache.org
> *Subject:* Re: Spark Streaming Data loss on failure to write
> BlockAdditionEvent failure to WAL
>
> For WAL in Spark to work with HDFS, the HDFS version you are running must
> support file appends. Contact your HDFS package/installation provider to
> figure out whether this is supported by your HDFS installation.
>
> On Mon, Nov 7, 2016 at 2:04 PM, Arijit  wrote:
>
>> Hello All,
>>
>>
>> We are using Spark 1.6.2 with WAL enabled and encountering data loss when
>> the following exception/warning happens. We are using HDFS as our
>> checkpoint directory.
>>
>>
>> Questions are:
>>
>>
>> 1. Is this a bug in Spark or issue with our configuration? Source looks
>> like the following. Which file already exist or who is suppose to set
>> hdfs.append.support configuration? Why doesn't it happen all the time?
>>
>>
>> private[streaming] object HdfsUtils {
>>
>>   def getOutputStream(path: String, conf: Configuration): FSDataOutputStream 
>> = {
>> val dfsPath = new Path(path)
>> val dfs = getFileSystemForPath(dfsPath, conf)
>> // If the file exists and we have append support, append instead of 
>> creating a new file
>> val stream: FSDataOutputStream = {
>>   if (dfs.isFile(dfsPath)) {
>> if (conf.getBoolean("hdfs.append.support", false) || 
>> dfs.isInstanceOf[RawLocalFileSystem]) {
>>   dfs.append(dfsPath)
>> } else {
>>   throw new IllegalStateException("File exists and there is no 
>> append support!")
>> }
>>   } else {
>> dfs.create(dfsPath)
>>   }
>> }
>> stream
>>   }
>>
>>
>> 2. Why does the job not retry and eventually fail when this error occurs?
>> The job skips processing the exact number of events dumped in the log. For
>> this particular example I see 987 + 4686 events were not processed and are
>> lost for ever (does not recover even on restart).
>>
>>
>> 16/11/07 21:23:39 ERROR WriteAheadLogManager  for Thread: Failed to write
>> to write ahead log after 3 failures
>> 16/11/07 21:23:39 WARN BatchedWriteAheadLog: BatchedWriteAheadLog Writer
>> failed to write ArrayBuffer(Record(java.nio.HeapByteBuffer[pos=1212
>> lim=1212 cap=1212],1478553818985,scala.concurrent.impl.Promise$Defaul
>> tPromise@5ce88cb6), Record(
>> java.nio.HeapByteBuffer[pos=1212 lim=1212 cap=1212],1478553818985,scala.
>> concurrent.impl.Promise$DefaultPromise@6d8f1feb))
>> java.lang.IllegalStateException: File exists and there is no append
>> support!
>> at org.apache.spark.streaming.util.HdfsUtils$.getOutputStream(
>> HdfsUtils.scala:35)
>> at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter
>> .org$apache$spark$streaming$util$FileBasedWriteAheadLogWrite
>> r$$stream$lzycompute(FileBasedWriteAheadLogWriter.scala:33)
>> at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter
>> .org$apache$spark$streaming$util$FileBasedWriteAheadLogWrite
>> r$$stream(FileBasedWriteAheadLogWriter.scala:33)
>> at org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter
>> .(FileBasedWriteAheadLogWriter.scala:41)
>> at org.apache.spark.streaming.util.FileBasedWriteAheadLog.getLo
>> gWriter(FileBasedWriteAheadLog.scala:217)
>> at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write
>> (FileBasedWriteAheadLog.scala:86)
>> at org.apache.spark.streaming.util.FileBasedWriteAheadLog.write
>> (FileBasedWriteAheadLog.scala:48)
>> at org.apache.spark.streaming.util.BatchedWriteAheadLog.org$apa
>> che$spark$streaming$util$BatchedWriteAheadLog$$flushRecords(
>> BatchedWriteAheadLog.scala:173)
>> at org.apache.spark.streaming.util.BatchedWriteAheadLog$$anon$
>> 1.run(BatchedWriteAheadLog.scala:140)
>> at java.lang.Thread.run(Thread.java:745)
>> 16/11/07 21:23:39 WARN ReceivedBlockTracker: Exception thrown while
>> writing record: BlockAdditionEvent(ReceivedBlockInfo(2,Some(987
>> 

Re: Handling windows characters with Spark CSV on Linux

2016-11-17 Thread Jörn Franke
You can do the conversion of character set (is this the issue?) as part of your 
loading process in Spark.
As far as i know the spark CSV package is based on Hadoop TextFileInputformat. 
This format to my best of knowledge supports only utf-8. So you have to do a 
conversion from windows to utf-8. If you refer to language specific settings 
(numbers, dates etc) - this is also not supported.

I started to work on the hadoopoffice library (which you can use with Spark) 
where you can read Excel files directly 
(https://github.com/ZuInnoTe/hadoopoffice).However, there is no official 
release - yet. There you can specify also the language in which you want to 
represent data values, numbers etc. when reading the file.

> On 17 Nov 2016, at 14:11, Mich Talebzadeh  wrote:
> 
> Hi,
> 
> In the past with Databricks package for csv files on occasions I had to do 
> some cleaning at Linux directory level before ingesting CSV file into HDFS 
> staging directory for Spark to read it.
> 
> I have a more generic issue that may have to be ready.
> 
> Assume that a provides using FTP to push CSV files into Windows directories. 
> The whole solution is built around windows and .NET.
> 
> Now you want to ingest those files into HDFS and process them with Spark CSV.
> 
> One can create NFS directories visible to Windows server and HDFS as well. 
> However, there may be issues with character sets etc. What are the best ways 
> of handling this? One way would be to use some scripts to make these 
> spreadsheet time files compatible with Linux and then load them into HDFS. 
> For example I know that if I saved a Excel spresheet file with DOS FORMAT, 
> that file will work OK with Spark CSV.  Are there tools to do this as well?
> 
> Thanks
> 
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> http://talebzadehmich.wordpress.com
> 
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  


Handling windows characters with Spark CSV on Linux

2016-11-17 Thread Mich Talebzadeh
Hi,

In the past with Databricks package for csv files on occasions I had to do
some cleaning at Linux directory level before ingesting CSV file into HDFS
staging directory for Spark to read it.

I have a more generic issue that may have to be ready.

Assume that a provides using FTP to push CSV files into Windows
directories. The whole solution is built around windows and .NET.

Now you want to ingest those files into HDFS and process them with Spark
CSV.

One can create NFS directories visible to Windows server and HDFS as well.
However, there may be issues with character sets etc. What are the best
ways of handling this? One way would be to use some scripts to make these
spreadsheet time files compatible with Linux and then load them into HDFS.
For example I know that if I saved a Excel spresheet file with DOS FORMAT,
that file will work OK with Spark CSV.  Are there tools to do this as well?

Thanks


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


RE: Nested UDFs

2016-11-17 Thread Mendelson, Assaf
The you probably want to create a normal function as opposed to UDF.
A UDF takes your function and applies it on each element in the column one 
after the other. You can think of it as working on the result of a loop 
iterating on the column.

pyspark.sql.function.regexp_replace receives a column and applies the regex on 
each element to create a new column.
You can do it in one of two ways:
The first is using a udf in which case you shouldn’t use the 
pyspark.sql.functions.regex but instead use standard python regex.
The second is to simply apply the column changes one after the other in a 
function. This should be something like:
def my_f(target_col):
for match,repl in regexp_list:
target_col = regexp_replace(target_col, match, 
repl)
return target_col

and then use it with:
  Test_data.select(my_f(test_data.name))

The second option is more correct and should provide better performance.

From: Perttu Ranta-aho [mailto:ranta...@iki.fi]
Sent: Thursday, November 17, 2016 1:50 PM
To: user@spark.apache.org
Subject: Re: Nested UDFs

Hi,

My example was little bogus, my real use case is to do multiple regexp 
replacements so something like:

def my_f(data):
for match, repl in regexp_list:
   data = regexp_replace(match, repl, data)
return data

I could achieve my goal by mutiple .select(regexp_replace()) lines, but one UDF 
would be nicer.

-Perttu

to 17. marraskuuta 2016 klo 9.42 Mendelson, Assaf 
> kirjoitti:
Regexp_replace is supposed to receive a column, you don’t need to write a UDF 
for it.
Instead try:
Test_data.select(regexp_Replace(test_data.name, ‘a’, ‘X’)

You would need a Udf if you would wanted to do something on the string value of 
a single row (e.g. return data + “bla”)

Assaf.

From: Perttu Ranta-aho [mailto:ranta...@iki.fi]
Sent: Thursday, November 17, 2016 9:15 AM
To: user@spark.apache.org
Subject: Nested UDFs

Hi,

Shouldn't this work?

from pyspark.sql.functions import regexp_replace, udf

def my_f(data):
return regexp_replace(data, 'a', 'X')
my_udf = udf(my_f)

test_data = sqlContext.createDataFrame([('a',), ('b',), ('c',)], ('name',))
test_data.select(my_udf(test_data.name)).show()

But instead of 'a' being replaced with 'X' I get exception:
  File 
".../spark-2.0.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/functions.py",
 line 1471, in regexp_replace
jc = sc._jvm.functions.regexp_replace(_to_java_column(str), pattern, 
replacement)
AttributeError: 'NoneType' object has no attribute '_jvm'

???

-Perttu



Re: Nested UDFs

2016-11-17 Thread Perttu Ranta-aho
Hi,

My example was little bogus, my real use case is to do multiple regexp
replacements so something like:

def my_f(data):
for match, repl in regexp_list:
   data = regexp_replace(match, repl, data)
return data

I could achieve my goal by mutiple .select(regexp_replace()) lines, but one
UDF would be nicer.

-Perttu

to 17. marraskuuta 2016 klo 9.42 Mendelson, Assaf 
kirjoitti:

> Regexp_replace is supposed to receive a column, you don’t need to write a
> UDF for it.
>
> Instead try:
>
> Test_data.select(regexp_Replace(test_data.name, ‘a’, ‘X’)
>
>
>
> You would need a Udf if you would wanted to do something on the string
> value of a single row (e.g. return data + “bla”)
>
>
>
> Assaf.
>
>
>
> *From:* Perttu Ranta-aho [mailto:ranta...@iki.fi]
> *Sent:* Thursday, November 17, 2016 9:15 AM
> *To:* user@spark.apache.org
> *Subject:* Nested UDFs
>
>
>
> Hi,
>
>
>
> Shouldn't this work?
>
>
>
> from pyspark.sql.functions import regexp_replace, udf
>
>
>
> def my_f(data):
>
> return regexp_replace(data, 'a', 'X')
>
> my_udf = udf(my_f)
>
>
>
> test_data = sqlContext.createDataFrame([('a',), ('b',), ('c',)], ('name',))
>
> test_data.select(my_udf(test_data.name)).show()
>
>
>
> But instead of 'a' being replaced with 'X' I get exception:
>
>   File
> ".../spark-2.0.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/functions.py",
> line 1471, in regexp_replace
>
> jc = sc._jvm.functions.regexp_replace(_to_java_column(str), pattern,
> replacement)
>
> AttributeError: 'NoneType' object has no attribute '_jvm'
>
>
>
> ???
>
>
>
> -Perttu
>
>
>


HDPCD SPARK Certification Queries

2016-11-17 Thread Aakash Basu
Hi all,


I want to know more about this examination -
http://hortonworks.com/training/certification/exam-objectives/#hdpcdspark


If anyone's there who appeared for the examination, can you kindly help?

1) What are the kind of questions that come,

2) Samples,

3) All the other details.

Thanks,
Aakash.


Join Query

2016-11-17 Thread Aakash Basu
Hi,




Conceptually I can understand below spark joins, when it comes to
implementation I don’t find much information in Google. Please help me with
code/pseudo code for below joins using java-spark or scala-spark.



*Replication Join:*

Given two datasets, where one is small enough to fit into
the memory, perform a Replicated join using Spark.

Note: Need a program to justify this fits for Replication Join.



*Semi-Join:*

Given a huge dataset, do a semi-join using spark. Note
that, with semi-join, one dataset needs to do Filter and projection to fit
into the cache.

Note: Need a program to justify this fits for Semi-Join.





*Composite Join:*

Given a dataset whereby a dataset is still too big after
filtering and cannot fit into the memory. Perform composite join on a
pre-sorted and pre-partitioned data using spark.

Note: Need a program to justify this fits for composite Join.





*Repartition join:*

Join two datasets by performing Repartition join in spark.

Note: Need a program to justify this fits for repartition Join.






Thanks,

Aakash.


Re: Best practice for preprocessing feature with DataFrame

2016-11-17 Thread Yan Facai
Could you give me an example, how to use Column function?
Thanks very much.

On Thu, Nov 17, 2016 at 12:23 PM, Divya Gehlot 
wrote:

> Hi,
>
> You can use the Column functions provided by Spark API
>
> https://spark.apache.org/docs/1.6.2/api/java/org/apache/
> spark/sql/functions.html
>
> Hope this helps .
>
> Thanks,
> Divya
>
>
> On 17 November 2016 at 12:08, 颜发才(Yan Facai)  wrote:
>
>> Hi,
>> I have a sample, like:
>> +---+--++
>> |age|gender| city_id|
>> +---+--++
>> |   | 1|1042015:city_2044...|
>> |90s| 2|1042015:city_2035...|
>> |80s| 2|1042015:city_2061...|
>> +---+--++
>>
>> and expectation is:
>> "age":  90s -> 90, 80s -> 80
>> "gender": 1 -> "male", 2 -> "female"
>>
>> I have two solutions:
>> 1. Handle each column separately,  and then join all by index.
>> val age = input.select("age").map(...)
>> val gender = input.select("gender").map(...)
>> val result = ...
>>
>> 2. Write utf function for each column, and then use in together:
>>  val result = input.select(ageUDF($"age"), genderUDF($"gender"))
>>
>> However, both are awkward,
>>
>> Does anyone have a better work flow?
>> Write some custom Transforms and use pipeline?
>>
>> Thanks.
>>
>>
>>
>>
>


Another Interesting Question on SPARK SQL

2016-11-17 Thread kant kodali
​
Which parts in the diagram above are executed by DataSource connectors and
which parts are executed by Tungsten? or to put it in another way which
phase in the diagram above does Tungsten leverages the Datasource
connectors (such as say cassandra connector ) ?

My understanding so far is that connectors come in during Physical planning
phase but I am not sure if the connectors take logical plan as an input?

Thanks,
kant


why is method predict protected in PredictionModel

2016-11-17 Thread wobu
Hi,

we were using Spark 1.3.1 for a long time and now we want to upgrade to 2.0
release.
So we used till today the mllib package and the RDD API.

Now im trying to refactor our mllib NaiveBayesClassifier to the new "ml"
api.

*The Question:*
why is the method "predict" in the class "PredictionModel" of package "ml"
protected?

I am trying to find a way to load a saved prediction model and predict
values without having to use the pipline or transformer api.


Best regards

wobu



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/why-is-method-predict-protected-in-PredictionModel-tp28095.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Kafka segmentation

2016-11-17 Thread Hoang Bao Thien
Hi,

I use CSV and other text files to Kafka just to test Kafka + Spark
Streaming by using direct stream. That's why I don't want Spark streaming
reads CSVs or text files directly.
In addition, I don't want a giant batch of records like the link you sent.
The problem is that we should receive the "similar" number of record of all
batchs instead of the first two or three batches have so large number of
records (e.g., 100K) but the last 1000 batches with only 200 records.

I know that the problem is not from the auto.offset.reset=largest, but I
don't know what I can do in this case.

Do you and other ones could suggest me some solutions please as this seems
the normal situation with Kafka+SpartStreaming.

Thanks.
Alex



On Thu, Nov 17, 2016 at 2:32 AM, Cody Koeninger  wrote:

> Yeah, if you're reporting issues, please be clear as to whether
> backpressure is enabled, and whether maxRatePerPartition is set.
>
> I expect that there is something wrong with backpressure, see e.g.
> https://issues.apache.org/jira/browse/SPARK-18371
>
> On Wed, Nov 16, 2016 at 5:05 PM, bo yang  wrote:
> > I hit similar issue with Spark Streaming. The batch size seemed a little
> > random. Sometime it was large with many Kafka messages inside same batch,
> > sometimes it was very small with just a few messages. Is it possible that
> > was caused by the backpressure implementation in Spark Streaming?
> >
> > On Wed, Nov 16, 2016 at 4:22 PM, Cody Koeninger 
> wrote:
> >>
> >> Moved to user list.
> >>
> >> I'm not really clear on what you're trying to accomplish (why put the
> >> csv file through Kafka instead of reading it directly with spark?)
> >>
> >> auto.offset.reset=largest just means that when starting the job
> >> without any defined offsets, it will start at the highest (most
> >> recent) available offsets.  That's probably not what you want if
> >> you've already loaded csv lines into kafka.
> >>
> >> On Wed, Nov 16, 2016 at 2:45 PM, Hoang Bao Thien  >
> >> wrote:
> >> > Hi all,
> >> >
> >> > I would like to ask a question related to the size of Kafka stream. I
> >> > want
> >> > to put data (e.g., file *.csv) to Kafka then use Spark streaming to
> get
> >> > the
> >> > output from Kafka and then save to Hive by using SparkSQL. The file
> csv
> >> > is
> >> > about 100MB with ~250K messages/rows (Each row has about 10 fields of
> >> > integer). I see that Spark Streaming first received two
> >> > partitions/batches,
> >> > the first is of 60K messages and the second is of 50K msgs. But from
> the
> >> > third batch, Spark just received 200 messages for each batch (or
> >> > partition).
> >> > I think that this problem is coming from Kafka or some configuration
> in
> >> > Spark. I already tried to configure with the setting
> >> > "auto.offset.reset=largest", but every batch only gets 200 messages.
> >> >
> >> > Could you please tell me how to fix this problem?
> >> > Thank you so much.
> >> >
> >> > Best regards,
> >> > Alex
> >> >
> >>
> >> -
> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >>
> >
>


Re: [SQL/Catalyst] Janino Generated Code Debugging

2016-11-17 Thread Takeshi Yamamuro
Hi,

not sure what you'd like to do tough, is this not enough?
import org.apache.spark.sql.execution.debug._
sql("SELECT 1").debugCodegen()

// maropu


On Thu, Nov 17, 2016 at 3:59 AM, Aleksander Eskilson <
aleksander...@gmail.com> wrote:

> Hi there,
>
> I have some jobs generating Java code (via Janino) that I would like to
> inspect more directly during runtime. The Janino page seems to indicate an
> environmental variable can be set to support debugging the generated code,
> allowing one to step into it directly and inspect variables and set
> breakpoints. I'm using Intellij and setting both
>
> -Dorg.codehaus.janino.source_debugging.enable=true
> -Dorg.codehaus.janino.source_debugging.dir=/Users/username/
> path/to/project/src
>
> but when I begin debug, I can't seem to view the generated code, even if I
> set a breakpoint to the location that calls it and attempt to step into the
> code, or reference a line of the stacktrace that should take me into the
> code. Any idea how to properly set Janino to debug the Catalyst-generated
> code more directly?
>
> Best,
> Alek
>



-- 
---
Takeshi Yamamuro


Re: How does predicate push down really help?

2016-11-17 Thread kant kodali
Thanks for the effort and clear explanation.

On Thu, Nov 17, 2016 at 12:07 AM, kant kodali  wrote:

> Yes thats how I understood it with your first email as well but the key
> thing here sounds like some datasources may not have operators such as
> filter and so on in which case Spark Still needs to work and be able to
> apply filter operation in memory after grabbing all the rows into memory.
>
>
>
> On Wed, Nov 16, 2016 at 11:56 PM, Mendelson, Assaf <
> assaf.mendel...@rsa.com> wrote:
>
>> In the first example, you define the table to be table users from some
>> SQL server. Then you perform a filter.
>>
>> Without predicate pushdown (or any optimization) basically spark
>> understand this as follows:
>>
>> “grab the data from the source described” (which in this case means get
>> all of the table from the external sql server to spark memory)
>>
>> “do the operations I asked for” (in this case filtering).
>>
>> What predicate pushdown means in this case is that since spark knows an
>> external SQL server can actually understand and benefit from the filter
>> command it can actually send the filter as part of the query and then once
>> the data arrives in spark, it is already filtered.
>>
>>
>>
>> In the second example we have two tables A and B. What you ask in the
>> command is:
>>
>> “Read A”
>>
>> “Read B”
>>
>> “Perform the join” (which is a heavy operation)
>>
>> “Perform the filtering on the result”
>>
>>
>>
>> What predicate pushdown would do instead is translate it to:
>>
>> “Read A”
>>
>> “Perform filtering on A”
>>
>> “Read B”
>>
>> “Perform filtering on B”
>>
>> “perform the join on the filtered A and B”
>>
>> Now the join is being made on smaller data (after the filtering) and
>> therefore takes less time. The heuristic is that in most cases the time
>> saved on the join would be much more than any extra time taken by the
>> filter itself.
>>
>>
>>
>> BTW. You can see the differences between the original plan and the
>> optimized plan by calling explain(true) on the dataframe.  This would show
>> you what was parsed, how the optimization worked and what was physically
>> run.
>>
>>
>>
>> Assaf.
>>
>>
>>
>> *From:* kant kodali [mailto:kanth...@gmail.com]
>> *Sent:* Thursday, November 17, 2016 9:50 AM
>> *To:* Mendelson, Assaf
>> *Cc:* user @spark
>> *Subject:* Re: How does predicate push down really help?
>>
>>
>>
>> Hi Assaf,
>>
>>
>>
>> I am still trying to understand the merits of predicate push down from
>> the examples you pointed out.
>>
>>
>>
>> Example 1: Say we don't have a predicate push down feature why does spark
>> needs to pull all the rows and filter it in memory? why not simply issue
>> select statement with "where" clause to do the filtering via JDBC or
>> something?
>>
>>
>>
>> Example 2: Same Argument as Example 1 except when we don't have a
>> predicate push down feature we could simply do it using JOIN and where
>> operators in the SQL statement right.
>>
>>
>>
>> I feel like I am missing something to understand the merits of predicate
>> push down.
>>
>>
>>
>> Thanks,
>>
>> kant
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On Wed, Nov 16, 2016 at 11:34 PM, Mendelson, Assaf <
>> assaf.mendel...@rsa.com> wrote:
>>
>> Actually, both you translate to the same plan.
>>
>> When you do sql(“some code”) or filter, it doesn’t actually do the query.
>> Instead it is translated to a plan (parsed plan) which transform everything
>> into standard spark expressions. Then spark analyzes it to fill in the
>> blanks (what is users table for example) and attempts to optimize it.
>> Predicate pushdown happens in the optimization portion.
>>
>> For example, let’s say that users would actually be backed by a table on
>> an sql query in mysql.
>>
>> Without predicate pushdown spark would first pull the entire users table
>> from mysql and only then do the filtering. Predicate pushdown would mean
>> the filtering would be done as part of the original sql query.
>>
>>
>>
>> Another (probably better) example would be something like having two
>> table A and B which are joined by some common key. Then a filtering is done
>> on the key. Moving the filter to be before the join would probably make
>> everything faster as filter is a faster operation than a join.
>>
>>
>>
>> Assaf.
>>
>>
>>
>> *From:* kant kodali [mailto:kanth...@gmail.com]
>> *Sent:* Thursday, November 17, 2016 8:03 AM
>> *To:* user @spark
>> *Subject:* How does predicate push down really help?
>>
>>
>>
>> How does predicate push down really help? in the following cases
>>
>>
>>
>> val df1 = spark.sql("select * from users where age > 30")
>>
>>
>>
>>  vs
>>
>>
>>
>> val df1 = spark.sql("select * from users")
>>
>> df.filter("age > 30")
>>
>>
>>
>>
>>
>>
>>
>
>


Re: How does predicate push down really help?

2016-11-17 Thread kant kodali
Yes thats how I understood it with your first email as well but the key
thing here sounds like some datasources may not have operators such as
filter and so on in which case Spark Still needs to work and be able to
apply filter operation in memory after grabbing all the rows into memory.



On Wed, Nov 16, 2016 at 11:56 PM, Mendelson, Assaf 
wrote:

> In the first example, you define the table to be table users from some SQL
> server. Then you perform a filter.
>
> Without predicate pushdown (or any optimization) basically spark
> understand this as follows:
>
> “grab the data from the source described” (which in this case means get
> all of the table from the external sql server to spark memory)
>
> “do the operations I asked for” (in this case filtering).
>
> What predicate pushdown means in this case is that since spark knows an
> external SQL server can actually understand and benefit from the filter
> command it can actually send the filter as part of the query and then once
> the data arrives in spark, it is already filtered.
>
>
>
> In the second example we have two tables A and B. What you ask in the
> command is:
>
> “Read A”
>
> “Read B”
>
> “Perform the join” (which is a heavy operation)
>
> “Perform the filtering on the result”
>
>
>
> What predicate pushdown would do instead is translate it to:
>
> “Read A”
>
> “Perform filtering on A”
>
> “Read B”
>
> “Perform filtering on B”
>
> “perform the join on the filtered A and B”
>
> Now the join is being made on smaller data (after the filtering) and
> therefore takes less time. The heuristic is that in most cases the time
> saved on the join would be much more than any extra time taken by the
> filter itself.
>
>
>
> BTW. You can see the differences between the original plan and the
> optimized plan by calling explain(true) on the dataframe.  This would show
> you what was parsed, how the optimization worked and what was physically
> run.
>
>
>
> Assaf.
>
>
>
> *From:* kant kodali [mailto:kanth...@gmail.com]
> *Sent:* Thursday, November 17, 2016 9:50 AM
> *To:* Mendelson, Assaf
> *Cc:* user @spark
> *Subject:* Re: How does predicate push down really help?
>
>
>
> Hi Assaf,
>
>
>
> I am still trying to understand the merits of predicate push down from the
> examples you pointed out.
>
>
>
> Example 1: Say we don't have a predicate push down feature why does spark
> needs to pull all the rows and filter it in memory? why not simply issue
> select statement with "where" clause to do the filtering via JDBC or
> something?
>
>
>
> Example 2: Same Argument as Example 1 except when we don't have a
> predicate push down feature we could simply do it using JOIN and where
> operators in the SQL statement right.
>
>
>
> I feel like I am missing something to understand the merits of predicate
> push down.
>
>
>
> Thanks,
>
> kant
>
>
>
>
>
>
>
>
>
> On Wed, Nov 16, 2016 at 11:34 PM, Mendelson, Assaf <
> assaf.mendel...@rsa.com> wrote:
>
> Actually, both you translate to the same plan.
>
> When you do sql(“some code”) or filter, it doesn’t actually do the query.
> Instead it is translated to a plan (parsed plan) which transform everything
> into standard spark expressions. Then spark analyzes it to fill in the
> blanks (what is users table for example) and attempts to optimize it.
> Predicate pushdown happens in the optimization portion.
>
> For example, let’s say that users would actually be backed by a table on
> an sql query in mysql.
>
> Without predicate pushdown spark would first pull the entire users table
> from mysql and only then do the filtering. Predicate pushdown would mean
> the filtering would be done as part of the original sql query.
>
>
>
> Another (probably better) example would be something like having two table
> A and B which are joined by some common key. Then a filtering is done on
> the key. Moving the filter to be before the join would probably make
> everything faster as filter is a faster operation than a join.
>
>
>
> Assaf.
>
>
>
> *From:* kant kodali [mailto:kanth...@gmail.com]
> *Sent:* Thursday, November 17, 2016 8:03 AM
> *To:* user @spark
> *Subject:* How does predicate push down really help?
>
>
>
> How does predicate push down really help? in the following cases
>
>
>
> val df1 = spark.sql("select * from users where age > 30")
>
>
>
>  vs
>
>
>
> val df1 = spark.sql("select * from users")
>
> df.filter("age > 30")
>
>
>
>
>
>
>