unsubscribe

2018-02-27 Thread 学生张洪斌



| |
学生张洪斌
|
|
邮箱:hongbinzh...@163.com
|

签名由 网易邮箱大师 定制

Re: Data loss in spark job

2018-02-27 Thread yncxcw
hi, 

Please check if your os supports memory overcommit. I doubted this caused by
your os bans the memory overcommitment, and the os kills the process when
memory overcommitment is detected (the spark executor is chosen to kill).
This is why you receive sigterm, and executor failed with the signal and
lost all your data.

Please check /proc/sys/vm/overcommit_memory and set it accordingly:

/proc/sys/vm/overcommit_memory
This switch knows 3 different settings:

0: The Linux kernel is free to overcommit memory (this is the default), a
heuristic algorithm is applied to figure out if enough memory is available.
1: The Linux kernel will always overcommit memory, and never check if enough
memory is available. This increases the risk of out-of-memory situations,
but also improves memory-intensive workloads.
2: The Linux kernel will not overcommit memory, and only allocate as much
memory as defined in overcommit_ratio.

Another way is to just decrease the JVM heap size by setting a small -Xmx to
decrease the amount of memory the JVM is requesting the OS to reserve.

Thanks!

Wei



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Data loss in spark job

2018-02-27 Thread Faraz Mateen
Hi,

I saw the following error message in executor logs:

*Java HotSpot(TM) 64-Bit Server VM warning: INFO:
os::commit_memory(0x000662f0, 520093696, 0) failed; error='Cannot
allocate memory' (errno=12)*

By increasing RAM of my nodes to 40 GB each, I was able to get rid of RPC
connection failures. However, the results I am getting after copying data
are still incorrect.

Before termination, executor logs have this error message:

*ERROR CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM*

I believe the executors are not shutting down gracefully and that is
causing spark to lose some data.

Can anyone please explain how I can further debug this?

Thanks,
Faraz

On Mon, Feb 26, 2018 at 4:46 PM, Faraz Mateen  wrote:

> Hi,
>
> I think I have a situation where spark is silently failing to write data
> to my Cassandra table. Let me explain my current situation.
>
> I have a table consisting of around 402 million records. The table
> consists of 84 columns. Table schema is something like this:
>
>
> *id (text)  |   datetime (timestamp)  |   field1 (text) | . |   field
> 84 (text)*
>
>
> To optimize queries on the data, I am splitting it into multiple tables
> using spark job mentioned below. Each separated table must have data from
> just one field from the source table. New table has the following structure:
>
>
> *id (text)  |   datetime (timestamp)  |   day (date)  |   value (text)*
>
>
> where, "value" column will contain the field column from the source table.
> Source table has around *402 million* records which is around *85 GB* of
> data distributed on *3 nodes (27 + 32 + 26)*. New table being populated
> is supposed to have the same number of records but it is missing some data.
>
> Initially, I assumed some problem with the data in source table. So, I
> copied 1 weeks of data from the source table into another table with the
> same schema. Then I split the data like I did before but this time, field
> specific table had the same number of records as the source table. I
> repeated this again with another data set from another time period and
> again number of records in field specific table  were equal to number of
> records in the source table.
>
> This has led me to believe that there is some problem with spark's
> handling of large data set. Here is my spark submit command to separate the
> data:
>
> *~/spark-2.1.0-bin-hadoop2.7/bin/spark-submit --master
> spark://10.128.0.18:7077   --packages
> datastax:spark-cassandra-connector:2.0.1-s_2.11 --con**f
> spark.cassandra.connection.host="10.128.1.1,10.128.1.2,10.128.1.3" --conf
> "spark.storage.memoryFraction=1" --conf spark.local.dir=/media/db/
> --executor-memory 10G --num-executors=6 --executo**r-cores=3
> --total-executor-cores 18 split_data.py*
>
>
> *split_data.py* is the name of my pyspark application. It is essentially
> executing the following query:
>
>
> *("select id,datetime,DATE_FORMAT(datetime,'-MM-dd') as day, "+field+"
> as value  from data  " )*
>
> The spark job does not crash after these errors and warnings. However when
> I check the number of records in the new table, it is always less than the
> number of records in source table. Moreover, the number of records in
> destination table is not the same after each run of the query. I changed
> logging level for spark submit to WARN and saw the following WARNINGS and
> ERRORS on the console:
>
> https://gist.github.com/anonymous/e05f1aaa131348c9a5a9a2db6d
> 141f8c#file-gistfile1-txt
>
> My cluster consists of *3 gcloud VMs*. A spark and a cassandra node is
> deployed on each VM.
> Each VM has *8 cores* of CPU and* 30 GB* RAM. Spark is deployed in
> standalone cluster mode.
> Spark version is *2.1.0*
> I am using datastax spark cassandra connector version *2.0.1*
> Cassandra Version is *3.9*
> Each spark executor is allowed 10 GB of RAM and there are 2 executors
> running on each node.
>
> Is the problem related to my machine resources? How can I root cause or
> fix this?
> Any help will be greatly appreciated.
>
> Thanks,
> Faraz
>


Re: How does Spark Structured Streaming determine an event has arrived late?

2018-02-27 Thread kant kodali
I see! I get the logic now!

On Tue, Feb 27, 2018 at 5:55 PM, naresh Goud 
wrote:

> Hi Kant,
>
> TD's explanation makes a lot sense. Refer this stackoverflow, where its
> was explained with program output.  Hope this helps.
>
> https://stackoverflow.com/questions/45579100/structured-
> streaming-watermark-vs-exactly-once-semantics
>
>
>
>
> Thanks,
> Naresh
> www.linkedin.com/in/naresh-dulam
> http://hadoopandspark.blogspot.com/
>
>
> On Tue, Feb 27, 2018 at 7:45 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> Let me answer the original question directly, that is, how do we
>> determine that an event is late. We simply track the maximum event time the
>> engine has seen in the data it has processed till now. And any data that
>> has event time less than the max is basically "late" (as it is
>> out-of-order). Now, in a distributed setting, it is very hard define to
>> whether each record is late or not, because it is hard to have a consistent
>> definition of max-event-time-seen. Fortunately, we dont have to do this
>> precisely because we dont really care whether a record is "late"; we only
>> care whether a record is "too late", that is, older than the watermark =
>> max-event-time-seen - watermark-delay). As the programming guide says, if
>> data is "late" but not "too late" we process it in the same way as non-late
>> data. Only when the data is "too late" do we drop it.
>>
>> To further clarify, we do not in any way to correlate processing-time
>> with event-time. The definition of lateness is only based on event-time and
>> has nothing to do with processing-time. This allows us to do event-time
>> processing with old data streams as well. For example, you may replay
>> 1-week old data as a stream, and the processing will be exactly the same as
>> it would have been if you had processed the stream in real-time a week ago.
>> This is fundamentally necessary for achieving the deterministic processing
>> that Structured Streaming guarantees.
>>
>> Regarding the picture, the "time" is actually "event-time". My apologies
>> for not making this clear in the picture. In hindsight, the picture can be
>> made much better.  :)
>>
>> Hope this explanation helps!
>>
>> TD
>>
>> On Tue, Feb 27, 2018 at 2:26 AM, kant kodali  wrote:
>>
>>> I read through the spark structured streaming documentation and I wonder
>>> how does spark structured streaming determine an event has arrived late?
>>> Does it compare the event-time with the processing time?
>>>
>>> [image: enter image description here]
>>> 
>>>
>>> Taking the above picture as an example Is the bold right arrow line
>>> "Time" represent processing time? If so
>>>
>>> 1) where does this processing time come from? since its streaming Is it
>>> assuming someone is likely using an upstream source that has processing
>>> timestamp in it or spark adds a processing timestamp field? For example,
>>> when reading messages from Kafka we do something like
>>>
>>> Dataset kafkadf = spark.readStream().forma("kafka").load()
>>>
>>> This dataframe has timestamp column by default which I am assuming is
>>> the processing time. correct? If so, Does Kafka or Spark add this timestamp?
>>>
>>> 2) I can see there is a time comparison between bold right arrow line
>>> and time in the message. And is that how spark determines an event is late?
>>>
>>
>>
>


Re: How does Spark Structured Streaming determine an event has arrived late?

2018-02-27 Thread naresh Goud
Hi Kant,

TD's explanation makes a lot sense. Refer this stackoverflow, where its was
explained with program output.  Hope this helps.

https://stackoverflow.com/questions/45579100/structured-streaming-watermark-vs-exactly-once-semantics




Thanks,
Naresh
www.linkedin.com/in/naresh-dulam
http://hadoopandspark.blogspot.com/


On Tue, Feb 27, 2018 at 7:45 PM, Tathagata Das 
wrote:

> Let me answer the original question directly, that is, how do we determine
> that an event is late. We simply track the maximum event time the engine
> has seen in the data it has processed till now. And any data that has event
> time less than the max is basically "late" (as it is out-of-order). Now, in
> a distributed setting, it is very hard define to whether each record is
> late or not, because it is hard to have a consistent definition of
> max-event-time-seen. Fortunately, we dont have to do this precisely because
> we dont really care whether a record is "late"; we only care whether a
> record is "too late", that is, older than the watermark =
> max-event-time-seen - watermark-delay). As the programming guide says, if
> data is "late" but not "too late" we process it in the same way as non-late
> data. Only when the data is "too late" do we drop it.
>
> To further clarify, we do not in any way to correlate processing-time with
> event-time. The definition of lateness is only based on event-time and has
> nothing to do with processing-time. This allows us to do event-time
> processing with old data streams as well. For example, you may replay
> 1-week old data as a stream, and the processing will be exactly the same as
> it would have been if you had processed the stream in real-time a week ago.
> This is fundamentally necessary for achieving the deterministic processing
> that Structured Streaming guarantees.
>
> Regarding the picture, the "time" is actually "event-time". My apologies
> for not making this clear in the picture. In hindsight, the picture can be
> made much better.  :)
>
> Hope this explanation helps!
>
> TD
>
> On Tue, Feb 27, 2018 at 2:26 AM, kant kodali  wrote:
>
>> I read through the spark structured streaming documentation and I wonder
>> how does spark structured streaming determine an event has arrived late?
>> Does it compare the event-time with the processing time?
>>
>> [image: enter image description here]
>> 
>>
>> Taking the above picture as an example Is the bold right arrow line
>> "Time" represent processing time? If so
>>
>> 1) where does this processing time come from? since its streaming Is it
>> assuming someone is likely using an upstream source that has processing
>> timestamp in it or spark adds a processing timestamp field? For example,
>> when reading messages from Kafka we do something like
>>
>> Dataset kafkadf = spark.readStream().forma("kafka").load()
>>
>> This dataframe has timestamp column by default which I am assuming is the
>> processing time. correct? If so, Does Kafka or Spark add this timestamp?
>>
>> 2) I can see there is a time comparison between bold right arrow line and
>> time in the message. And is that how spark determines an event is late?
>>
>
>


Re: How does Spark Structured Streaming determine an event has arrived late?

2018-02-27 Thread Tathagata Das
Let me answer the original question directly, that is, how do we determine
that an event is late. We simply track the maximum event time the engine
has seen in the data it has processed till now. And any data that has event
time less than the max is basically "late" (as it is out-of-order). Now, in
a distributed setting, it is very hard define to whether each record is
late or not, because it is hard to have a consistent definition of
max-event-time-seen. Fortunately, we dont have to do this precisely because
we dont really care whether a record is "late"; we only care whether a
record is "too late", that is, older than the watermark =
max-event-time-seen - watermark-delay). As the programming guide says, if
data is "late" but not "too late" we process it in the same way as non-late
data. Only when the data is "too late" do we drop it.

To further clarify, we do not in any way to correlate processing-time with
event-time. The definition of lateness is only based on event-time and has
nothing to do with processing-time. This allows us to do event-time
processing with old data streams as well. For example, you may replay
1-week old data as a stream, and the processing will be exactly the same as
it would have been if you had processed the stream in real-time a week ago.
This is fundamentally necessary for achieving the deterministic processing
that Structured Streaming guarantees.

Regarding the picture, the "time" is actually "event-time". My apologies
for not making this clear in the picture. In hindsight, the picture can be
made much better.  :)

Hope this explanation helps!

TD

On Tue, Feb 27, 2018 at 2:26 AM, kant kodali  wrote:

> I read through the spark structured streaming documentation and I wonder
> how does spark structured streaming determine an event has arrived late?
> Does it compare the event-time with the processing time?
>
> [image: enter image description here]
> 
>
> Taking the above picture as an example Is the bold right arrow line "Time"
> represent processing time? If so
>
> 1) where does this processing time come from? since its streaming Is it
> assuming someone is likely using an upstream source that has processing
> timestamp in it or spark adds a processing timestamp field? For example,
> when reading messages from Kafka we do something like
>
> Dataset kafkadf = spark.readStream().forma("kafka").load()
>
> This dataframe has timestamp column by default which I am assuming is the
> processing time. correct? If so, Does Kafka or Spark add this timestamp?
>
> 2) I can see there is a time comparison between bold right arrow line and
> time in the message. And is that how spark determines an event is late?
>


Re: [Beginner] Kafka 0.11 header support in Spark Structured Streaming

2018-02-27 Thread Tathagata Das
Unfortunately, exposing Kafka headers is not yet supported in Structured
Streaming. The community is more than welcome to add support for it :)

On Tue, Feb 27, 2018 at 2:51 PM, Karthik Jayaraman 
wrote:

> Hi all,
>
> I am using Spark 2.2.1 Structured Streaming to read messages from Kafka. I
> would like to know how to access the Kafka headers programmatically ? Since
> the Kafka message header support is introduced in Kafka 0.11 (
> https://issues.apache.org/jira/browse/KAFKA-4208), is it supported in
> Spark. ? If yes, can anyone point me to an example ?
>
> - Karthik
>


[Beginner] Kafka 0.11 header support in Spark Structured Streaming

2018-02-27 Thread Karthik Jayaraman
Hi all,

I am using Spark 2.2.1 Structured Streaming to read messages from Kafka. I 
would like to know how to access the Kafka headers programmatically ? Since the 
Kafka message header support is introduced in Kafka 0.11 
(https://issues.apache.org/jira/browse/KAFKA-4208 
), is it supported in Spark. 
? If yes, can anyone point me to an example ?

- Karthik 

Re: Spark MLlib: Should I call .cache before fitting a model?

2018-02-27 Thread Nick Pentreath
Currently, fit for many (most I think) models will cache the input data.
For LogisticRegression this is definitely the case, so you won't get any
benefit from caching it yourself.

On Tue, 27 Feb 2018 at 21:25 Gevorg Hari  wrote:

> Imagine that I am training a Spark MLlib model as follows:
>
> val traingData = loadTrainingData(...)val logisticRegression = new 
> LogisticRegression()
>
> traingData.cacheval logisticRegressionModel = 
> logisticRegression.fit(trainingData)
>
> Does the call traingData.cache improve performances at training time or
> is it not needed?
>
> Does the .fit(...) method for a ML algorithm call cache/unpersist
> internally?
>
>


Re: SizeEstimator

2018-02-27 Thread David Capwell
Thanks for the reply and sorry for my delayed response, had to go find the
profile data to lookup the class again.


https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala

That class extends SizeEstimator and has a field "map" which buffers the
rows.  In my case the buffer was > 1 million rows so became costly every
time it was checked.


This can be reproduced, create a random data set of (string, long), then
group by string (I believe this is what the code did first, there was a
sort later but should have been a different stage).  Make sure number of
executors is small (for example only one) else you are reducing the size of
M for each executor.

On Mon, Feb 26, 2018, 10:04 PM 叶先进  wrote:

> What type is for the buffer you mentioned?
>
>
> On 27 Feb 2018, at 11:46 AM, David Capwell  wrote:
>
> advancedxy , I don't remember the code as well
> anymore but what we hit was a very simple schema (string, long). The issue
> is the buffer had a million of these so SizeEstimator of the buffer had to
> keep recalculating the same elements over and over again.  SizeEstimator
> was on-cpu about 30% of the time, bounding the buffer got it to be < 5%
> (going off memory so may be off).
>
> The class info(size of fields lay on heap) is cached for every occurred
> class, so the size info of the same elements would not be recalculated.
> However, for Collection class (or similar) SizeEstimator will scan all the
> elements in the container (`next` field in LinkedList for example).
>
> And the array is a special case: SizeEstimator will sample array if
> array.length > ARRAY_SIZE_FOR_SAMPLING(400).
>
> The cost is really (assuming memory is O(1) which is not true) O(N × M)
> where N is number of rows in buffer and M is size of schema.  My case could
> be solved by not recomputing which would bring the cost to O(M) since
> bookkeeping should be consistent time. There was logic to delay
> recalculating bases off a change in frequency, but that didn't really do
> much for us, bounding and spilling was the bigger win in our case.
>
> On Mon, Feb 26, 2018, 7:24 PM Xin Liu  wrote:
>
>> Thanks David. Another solution is to convert the protobuf object to byte
>> array, It does speed up SizeEstimator
>>
>> On Mon, Feb 26, 2018 at 5:34 PM, David Capwell 
>> wrote:
>>
>>> This is used to predict the current cost of memory so spark knows to
>>> flush or not. This is very costly for us so we use a flag marked in the
>>> code as private to lower the cost
>>>
>>> spark.shuffle.spill.numElementsForceSpillThreshold (on phone hope no
>>> typo) - how many records before flush
>>>
>>> This lowers the cost because it let's us leave data in young, if we
>>> don't bound we get everyone promoted to old and GC becomes a issue.  This
>>> doesn't solve the fact that the walk is slow, but lowers the cost of GC.
>>> For us we make sure to have spare memory on the system for page cache so
>>> spilling to disk for us is a memory write 99% of the time.  If your host
>>> has less free memory spilling may become more expensive.
>>>
>>>
>>> If the walk is your bottleneck and not GC then I would recommend JOL and
>>> guessing to better predict memory.
>>>
>>> On Mon, Feb 26, 2018, 4:47 PM Xin Liu  wrote:
>>>
 Hi folks,

 We have a situation where, shuffled data is protobuf based, and
 SizeEstimator is taking a lot of time.

 We have tried to override SizeEstimator to return a constant value,
 which speeds up things a lot.

 My questions, what is the side effect of disabling SizeEstimator? Is it
 just spark do memory reallocation, or there is more severe consequences?

 Thanks!

>>>
>>
>


Spark MLlib: Should I call .cache before fitting a model?

2018-02-27 Thread Gevorg Hari
Imagine that I am training a Spark MLlib model as follows:

val traingData = loadTrainingData(...)val logisticRegression = new
LogisticRegression()

traingData.cacheval logisticRegressionModel =
logisticRegression.fit(trainingData)

Does the call traingData.cache improve performances at training time or is
it not needed?

Does the .fit(...) method for a ML algorithm call cache/unpersist
internally?


Suppressing output from Apache Ivy (?) when calling spark-submit with --packages

2018-02-27 Thread Nicholas Chammas
I’m not sure whether this is something controllable via Spark, but when you
call spark-submit with --packages you get a lot of output. Is there any way
to suppress it? Does it come from Apache Ivy?

I posted more details about what I’m seeing on Stack Overflow
.

Nick


Re: CATALYST rule join

2018-02-27 Thread Yong Zhang
Not fully understand your question, but maybe you want check out this JIRA 
https://issues.apache.org/jira/browse/SPARK-17728, especially in the comments 
area. There are some discussion about the logic why UDF could be executed multi 
times by Spark.

Yong


From: tan shai 
Sent: Tuesday, February 27, 2018 4:19 AM
To: user@spark.apache.org
Subject: Re: CATALYST rule join

Hi,

I need to write a rule to customize the join function using Spark Catalyst 
optimizer. The objective to duplicate the second dataset using this process:

- Execute a udf on the column called x, this udf returns an array

- Execute an explode function on the new column

Using SQL terms, my objective is to execute this query on the second table :

SELECT EXPLODE(foo(x)) from table2

Where `foo` is is a udf that return an array of elements.

I have this rule:

case class JoinRule(spark: SparkSession) extends Rule[LogicalPlan] {

override def apply(plan: LogicalPlan): LogicalPlan = plan transform {

case join@Join(left, right, _, Some(condition)) =>

{

val attr = right.outputSet.find(x => x.toString().contains("x"))

val udf = ScalaUDF((x: Long) => x +: f(ipix), ArrayType(LongType), 
Seq(attr.last.toAttribute))

val explode = Explode(udf)

val resolvedGenerator = Generate(explode, true,false, qualifier = None, 
udf.references.toSeq, right)

var newRight = Project(resolvedGenerator.output,resolvedGenerator)

Join(left, newRight , Inner,Option(condition))

}
  }
}

But the problem is that the operation `Generate explode` appears many times in 
the physical plan.


Do you have any other ideas ? Maybe rewriting the code.

Thank you


2018-02-25 23:08 GMT+01:00 tan shai 
>:
Hi,

I need to write a rule to customize the join function using Spark Catalyst 
optimizer. The objective to duplicate the second dataset using this process:

- Execute a udf on the column called x, this udf returns an array

- Execute an explode function on the new column

Using SQL terms, my objective is to execute this query on the second table :

SELECT EXPLODE(foo(x)) from table2

Where `foo` is is a udf that return an array of elements.

I have this rule:

case class JoinRule(spark: SparkSession) extends Rule[LogicalPlan] {

override def apply(plan: LogicalPlan): LogicalPlan = plan transform {

case join@Join(left, right, _, Some(condition)) =>

{

val attr = right.outputSet.find(x => x.toString().contains("x"))

val udf = ScalaUDF((x: Long) => x +: f(ipix), ArrayType(LongType), 
Seq(attr.last.toAttribute))

val explode = Explode(udf)

val resolvedGenerator = Generate(explode, true,false, qualifier = None, 
udf.references.toSeq, right)

var newRight = Project(resolvedGenerator.output,resolvedGenerator)

Join(left, newRight , Inner,Option(condition))

}
  }
}

But the problem is that the operation `Generate explode` appears many times in 
the physical plan.


Do you have any other ideas ? Maybe rewriting the code.

Thank you.




Unsubscribe

2018-02-27 Thread purna pradeep


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Returns Null when reading data from XML Ask Question

2018-02-27 Thread Sateesh Karuturi
I am trying to Parsing the data from XML file through Spark using databrics
library

Here is my code:

import org.apache.spark.SparkConfimport
org.apache.spark.SparkContextimport
org.apache.spark.sql.SQLContextimport
org.apache.spark.sql.functionsimport java.text.Formatimport
org.apache.spark.sql.functions.concat_wsimport
org.apache.spark.sqlimport org.apache.spark.sql.types._import
org.apache.spark.sql.catalyst.plans.logical.Withimport
org.apache.spark.sql.functions.litimport
org.apache.spark.sql.functions.udfimport scala.sys.process._import
org.apache.spark.sql.functions.litimport
org.apache.spark.sql.functions.udfimport
org.apache.spark.sql.functions._ object printschema {
   def main(args: Array[String]): Unit =
  {
  val conf = new SparkConf().setAppName("printschema").setMaster("local")
  conf.set("spark.debug.maxToStringFields", "1000")
  val context = new SparkContext(conf)
  val sqlCotext = new SQLContext(context)
  import sqlCotext.implicits._
  val df = sqlCotext.read.format("com.databricks.spark.xml")
 .option("rowTag", "us-bibliographic-data-application")
 .option("treatEmptyValuesAsNulls", true)
 .load("/Users/praveen/Desktop/ipa0105.xml")
val q1= 
df.withColumn("document",$"application-reference.document-id.doc-number".cast(sql.types.StringType))
   
.withColumn("document_number",$"application-reference.document-id.doc-number".cast(sql.types.StringType)).select("document","document_number")
   for(l<-q1)
   {
 val m1=l.get(0)
 val m2=l.get(1)
 println(m1,m2)
   }
  }}


When I run the code on ScalaIDE/IntelliJ IDEA its works fine and Here is my
Output.

(14789882,14789882)(14755945,14755945)(14755919,14755919)(14755034,14755034)

But, when i make a jar and run by using spark-submit it returns simply null
values

OUTPUT :

NULL,NULL
NULL,NULL
NULL,NULL


Please help me out.

Thanks in advance.


How does Spark Structured Streaming determine an event has arrived late?

2018-02-27 Thread kant kodali
I read through the spark structured streaming documentation and I wonder
how does spark structured streaming determine an event has arrived late?
Does it compare the event-time with the processing time?

[image: enter image description here] 

Taking the above picture as an example Is the bold right arrow line "Time"
represent processing time? If so

1) where does this processing time come from? since its streaming Is it
assuming someone is likely using an upstream source that has processing
timestamp in it or spark adds a processing timestamp field? For example,
when reading messages from Kafka we do something like

Dataset kafkadf = spark.readStream().forma("kafka").load()

This dataframe has timestamp column by default which I am assuming is the
processing time. correct? If so, Does Kafka or Spark add this timestamp?

2) I can see there is a time comparison between bold right arrow line and
time in the message. And is that how spark determines an event is late?


Re: Spark on K8s - using files fetched by init-container?

2018-02-27 Thread Felix Cheung
Yes you were pointing to HDFS on a loopback address...


From: Jenna Hoole 
Sent: Monday, February 26, 2018 1:11:35 PM
To: Yinan Li; user@spark.apache.org
Subject: Re: Spark on K8s - using files fetched by init-container?

Oh, duh. I completely forgot that file:// is a prefix I can use. Up and running 
now :)

Thank you so much!
Jenna

On Mon, Feb 26, 2018 at 1:00 PM, Yinan Li 
> wrote:
OK, it looks like you will need to use 
`file:///var/spark-data/spark-files/flights.csv` instead. The 'file://' scheme 
must be explicitly used as it seems it defaults to 'hdfs' in your setup.

On Mon, Feb 26, 2018 at 12:57 PM, Jenna Hoole 
> wrote:
Thank you for the quick response! However, I'm still having problems.

When I try to look for /var/spark-data/spark-files/flights.csv I get told:

Error: Error in loadDF : analysis error - Path does not exist: 
hdfs://192.168.0.1:8020/var/spark-data/spark-files/flights.csv;

Execution halted

Exception in thread "main" org.apache.spark.SparkUserAppException: User 
application exited with 1

at org.apache.spark.deploy.RRunner$.main(RRunner.scala:104)

at org.apache.spark.deploy.RRunner.main(RRunner.scala)

And when I try to look for local:///var/spark-data/spark-files/flights.csv, I 
get:

Error in file(file, "rt") : cannot open the connection

Calls: read.csv -> read.table -> file

In addition: Warning message:

In file(file, "rt") :

  cannot open file 'local:///var/spark-data/spark-files/flights.csv': No such 
file or directory

Execution halted

Exception in thread "main" org.apache.spark.SparkUserAppException: User 
application exited with 1

at org.apache.spark.deploy.RRunner$.main(RRunner.scala:104)

at org.apache.spark.deploy.RRunner.main(RRunner.scala)

I can see from a kubectl describe that the directory is getting mounted.

Mounts:

  /etc/hadoop/conf from hadoop-properties (rw)

  
/var/run/secrets/kubernetes.io/serviceaccount
 from spark-token-pxz79 (ro)

  /var/spark-data/spark-files from download-files (rw)

  /var/spark-data/spark-jars from download-jars-volume (rw)

  /var/spark/tmp from spark-local-dir-0-tmp (rw)

Is there something else I need to be doing in my set up?

Thanks,
Jenna

On Mon, Feb 26, 2018 at 12:02 PM, Yinan Li 
> wrote:
The files specified through --files are localized by the init-container to 
/var/spark-data/spark-files by default. So in your case, the file should be 
located at /var/spark-data/spark-files/flights.csv locally in the container.

On Mon, Feb 26, 2018 at 10:51 AM, Jenna Hoole 
> wrote:
This is probably stupid user error, but I can't for the life of me figure out 
how to access the files that are staged by the init-container.

I'm trying to run the SparkR example data-manipulation.R which requires the 
path to its datafile. I supply the hdfs location via --files and then the full 
hdfs path.


--files 
hdfs://192.168.0.1:8020/user/jhoole/flights.csv
 local:///opt/spark/examples/src/main/r/data-manipulation.R 
hdfs://192.168.0.1:8020/user/jhoole/flights.csv

The init-container seems to load my file.

18/02/26 18:29:09 INFO spark.SparkContext: Added file 
hdfs://192.168.0.1:8020/user/jhoole/flights.csv
 at 
hdfs://192.168.0.1:8020/user/jhoole/flights.csv
 with timestamp 1519669749519

18/02/26 18:29:09 INFO util.Utils: Fetching 
hdfs://192.168.0.1:8020/user/jhoole/flights.csv
 to 
/var/spark/tmp/spark-d943dae6-9b95-4df0-87a3-9f7978d6d4d2/userFiles-4112b7aa-b9e7-47a9-bcbc-7f7a01f93e38/fetchFileTemp7872615076522023165.tmp

However, I get an error that my file does not exist.

Error in file(file, "rt") : cannot open the connection

Calls: read.csv -> read.table -> file

In addition: Warning message:

In file(file, "rt") :

  cannot open file 
'hdfs://192.168.0.1:8020/user/jhoole/flights.csv':
 No such file or directory

Execution halted

Exception in thread "main" org.apache.spark.SparkUserAppException: User 
application exited with 1

at org.apache.spark.deploy.RRunner$.main(RRunner.scala:104)

at org.apache.spark.deploy.RRunner.main(RRunner.scala)

If I try supplying just flights.csv, I get a different error

--files 
hdfs://192.168.0.1:8020/user/jhoole/flights.csv
 local:///opt/spark/examples/src/main/r/data-manipulation.R flights.csv


Error: Error in loadDF : analysis error - Path does not exist: 

Re: CATALYST rule join

2018-02-27 Thread tan shai
 Hi,

I need to write a rule to customize the join function using Spark Catalyst
optimizer. The objective to duplicate the second dataset using this
process:

- Execute a udf on the column called x, this udf returns an array

- Execute an explode function on the new column

Using SQL terms, my objective is to execute this query on the second table :

SELECT EXPLODE(foo(x)) from table2

Where `foo` is is a udf that return an array of elements.

I have this rule:

case class JoinRule(spark: SparkSession) extends Rule[LogicalPlan] {

override def apply(plan: LogicalPlan): LogicalPlan = plan transform {

case join@Join(left, right, _, Some(condition)) =>

{

val attr = right.outputSet.find(x => x.toString().contains("x"))

val udf = ScalaUDF((x: Long) => x +: f(ipix), ArrayType(LongType),
Seq(attr.last.toAttribute))

val explode = Explode(udf)

val resolvedGenerator = Generate(explode, true,false, qualifier = None,
udf.references.toSeq, right)

var newRight = Project(resolvedGenerator.output,resolvedGenerator)

Join(left, newRight , Inner,Option(condition))

}
  }
}

But the problem is that the operation `Generate explode` appears many times
in the physical plan.


Do you have any other ideas ? Maybe rewriting the code.

Thank you


2018-02-25 23:08 GMT+01:00 tan shai :

> Hi,
>
> I need to write a rule to customize the join function using Spark Catalyst
> optimizer. The objective to duplicate the second dataset using this
> process:
>
> - Execute a udf on the column called x, this udf returns an array
>
> - Execute an explode function on the new column
>
> Using SQL terms, my objective is to execute this query on the second table
> :
>
> SELECT EXPLODE(foo(x)) from table2
>
> Where `foo` is is a udf that return an array of elements.
>
> I have this rule:
>
> case class JoinRule(spark: SparkSession) extends Rule[LogicalPlan] {
>
> override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
>
> case join@Join(left, right, _, Some(condition)) =>
>
> {
>
> val attr = right.outputSet.find(x => x.toString().contains("x"))
>
> val udf = ScalaUDF((x: Long) => x +: f(ipix), ArrayType(LongType),
> Seq(attr.last.toAttribute))
>
> val explode = Explode(udf)
>
> val resolvedGenerator = Generate(explode, true,false, qualifier =
> None, udf.references.toSeq, right)
>
> var newRight = Project(resolvedGenerator.output,resolvedGenerator)
>
> Join(left, newRight , Inner,Option(condition))
>
> }
>   }
> }
>
> But the problem is that the operation `Generate explode` appears many
> times in the physical plan.
>
>
> Do you have any other ideas ? Maybe rewriting the code.
>
> Thank you.
>
>