Re: RDD order preservation through transformations

2017-09-13 Thread Suzen, Mehmet
I think it is one of the conceptual difference in Spark compare to
other languages, there is no indexing in plain RDDs, This was the
thread with Ankit:

Yes. So order preservation can not be guaranteed in the case of
failure. Also not sure if partitions are ordered. Can you get the same
sequence of partitions in mapPartition?

On 13 Sep 2017 19:54, "Ankit Maloo"  wrote:
>
> Rdd are fault tolerant as it can be recomputed using DAG without storing the 
> intermediate RDDs.
>
> On 13-Sep-2017 11:16 PM, "Suzen, Mehmet"  wrote:
>>
>> But what happens if one of the partitions fail, how fault tolerance recover 
>> elements in other partitions.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: compile error: No classtag available while calling RDD.zip()

2017-09-13 Thread bluejoe
Thanks for your reply!

Actually, It is Ok when I use RDD.zip() like this:

1 def zipDatasets(m:Dataset[String], n:Dataset[Int])={

2 m.sparkSession.createDataset(m.rdd.zip(n.rdd));

3 }


But in my project, the type of Dataset is designated by the caller, so I 
introduce X,Y:


1 def zipDatasets[X: Encoder, Y: Encoder](m:Dataset[X], n:Dataset[Y])={

2 m.sparkSession.createDataset(m.rdd.zip(n.rdd));

3 }


It reports error because Y is unknown to the compiler, while the compiler needs 
ClassTag information of Y
Now I have no idea to fix it.

Regards,
bluejoe

发件人:  Anastasios Zouzias
答复:  
日期:  2017年9月14日 星期四 上午2:10
至:  bluejoe
抄送:  user
主题:  Re: compile error: No classtag available while calling RDD.zip()

Hi there,

If it is OK with you to work with DataFrames, you can do

https://gist.github.com/zouzias/44723de11222535223fe59b4b0bc228c

import org.apache.spark.sql.Row   
import org.apache.spark.sql.types.{StructField,StructType,IntegerType, 
LongType}  
 
  
 
val  df = sc.parallelize(Seq(  
 
(1.0, 2.0), (0.0, -1.0),  
 
(3.0, 4.0), (6.0, -2.3))).toDF("x", "y")  
 
  
 
// Append "rowid" column of type Long  
 
val schema = df.schema  
 
val newSchema = StructType(df.schema.fields ++ Array(StructField("rowid", 
LongType, false)))  
 
  
 
// Zip on RDD level  
 
val rddWithId = df.rdd.zipWithIndex  
 
// Convert back to DataFrame  
 
val dfZippedWithId =  spark.createDataFrame(rddWithId.map{ case (row, index) => 
Row.fromSeq(row.toSeq ++ Array(index))}, newSchema)  
 
  
 
// Show results  
 dfZippedWithId.show

Best,
Anastasios



On Wed, Sep 13, 2017 at 5:07 PM, 沈志宏  wrote:
Hello,Since Dataset has no zip(..) methods, so I wrote following code to zip 
two datasets: 

1   def zipDatasets[X: Encoder, Y: Encoder](spark: SparkSession, m: 
Dataset[X], n: Dataset[Y]) = {
2   val rdd = m.rdd.zip(n.rdd);
3   import spark.implicits._
4   spark.createDataset(rdd);
5   }

However, in the m.rdd.zip(…) call, compile error is reported:   No ClassTag 
available for Y

I know this error can be corrected when I declare Y as a ClassTag like this:

1   def foo[X: Encoder, Y: ClassTag](spark: SparkSession, …

But this will make line 5 report a new error:
Unable to find encoder for type stored in a Dataset.

Now, I have no idea to solve this problem. How to declared Y as both an Encoder 
and a ClassTag?

Many thanks!

Best regards,
bluejoe
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org




-- 
-- Anastasios Zouzias



Re-sharded kinesis stream starts generating warnings after kinesis shard numbers were doubled

2017-09-13 Thread Mikhailau, Alex
Has anyone seen the following warnings in the log after a kinesis stream has 
been re-sharded?

com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask

WARN Cannot get the shard for this ProcessTask, so duplicate KPL user records 
in the event of resharding will not be dropped during deaggregation of Amazon 
Kinesis records.


com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy

WARN Cannot find the shard given the shardId shardId-0599




how sequence of chained jars in spark.(driver/executor).extraClassPath matters

2017-09-13 Thread Richard Xin
so let's say I have chained path in 
spark.driver.extraClassPath/spark.executor.extraClassPath such as 
/path1/*:/path2/*, and I have different versions of the same jar under those 2 
directories, how spark pick the version of jar to use, from /path1/*?

Thanks.

Should I use Dstream or Structured Stream to transfer data from source to sink and then back from sink to source?

2017-09-13 Thread kant kodali
Hi All,

I am trying to read data from kafka, insert into Mongo and read from mongo
and insert back into Kafka. I went with structured stream approach first
however I believe I am making some naiver error because my map operations
are not getting invoked.

The pseudo code looks like this

DataSet resultDataSet = jsonDataset.mapPartitions(
insertIntoMongo).mapPartitions(readFromMongo);

StreamingQuery query =
resultDataSet.trigger(ProcesingTime(1000)).format("kafka").start();

query.awaitTermination();

The mapPartitions in this code is not getting executed. Is this because I
am not calling any action on my streaming dataset? In the Dstream case, I
used to call forEachRDD and it worked well. so how do I do this using
structured streaming?

Thanks!


Re: compile error: No classtag available while calling RDD.zip()

2017-09-13 Thread Anastasios Zouzias
Hi there,

If it is OK with you to work with DataFrames, you can do

https://gist.github.com/zouzias/44723de11222535223fe59b4b0bc228c

import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField,StructType,IntegerType,
LongType}
val df = sc.parallelize(Seq(
(1.0, 2.0), (0.0, -1.0),
(3.0, 4.0), (6.0, -2.3))).toDF("x", "y")
// Append "rowid" column of type Long
val schema = df.schema
val newSchema = StructType(df.schema.fields ++ Array(StructField("rowid",
LongType, false)))
// Zip on RDD level
val rddWithId = df.rdd.zipWithIndex
// Convert back to DataFrame
val dfZippedWithId = spark.createDataFrame(rddWithId.map{ case (row, index)
=> Row.fromSeq(row.toSeq ++ Array(index))}, newSchema)
// Show results
dfZippedWithId.show

Best,
Anastasios



On Wed, Sep 13, 2017 at 5:07 PM, 沈志宏  wrote:

> Hello,Since Dataset has no zip(..) methods, so I wrote following code to
> zip two datasets:
>
> 1   def zipDatasets[X: Encoder, Y: Encoder](spark: SparkSession, m:
> Dataset[X], n: Dataset[Y]) = {
> 2   val rdd = m.rdd.zip(n.rdd);
> 3   import spark.implicits._
> 4   spark.createDataset(rdd);
> 5   }
>
> However, in the m.rdd.zip(…) call, compile error is reported:   No
> ClassTag available for Y
>
> I know this error can be corrected when I declare Y as a ClassTag like
> this:
>
> 1   def foo[X: Encoder, Y: ClassTag](spark: SparkSession, …
>
> But this will make line 5 report a new error:
> Unable to find encoder for type stored in a Dataset.
>
> Now, I have no idea to solve this problem. How to declared Y as both an
> Encoder and a ClassTag?
>
> Many thanks!
>
> Best regards,
> bluejoe
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
-- Anastasios Zouzias



Re: RDD order preservation through transformations

2017-09-13 Thread lucas.g...@gmail.com
I'm wondering why you need order preserved, we've had situations where
keeping the source as an artificial field in the dataset was important and
I had to run contortions to inject that (In this case the datasource had no
unique key).

Is this similar?

On 13 September 2017 at 10:46, Suzen, Mehmet  wrote:

> But what happens if one of the partitions fail, how fault tolarence
> recover elements in other partitions.
>
> On 13 Sep 2017 18:39, "Ankit Maloo"  wrote:
>
>> AFAIK, the order of a rdd is maintained across a partition for Map
>> operations. There is no way a map operation  can change sequence across a
>> partition as partition is local and computation happens one record at a
>> time.
>>
>> On 13-Sep-2017 9:54 PM, "Suzen, Mehmet"  wrote:
>>
>> I think the order has no meaning in RDDs see this post, specially zip
>> methods:
>> https://stackoverflow.com/questions/29268210/mind-blown-rdd-zip-method
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>>


Re: RDD order preservation through transformations

2017-09-13 Thread Suzen, Mehmet
But what happens if one of the partitions fail, how fault tolarence recover
elements in other partitions.

On 13 Sep 2017 18:39, "Ankit Maloo"  wrote:

> AFAIK, the order of a rdd is maintained across a partition for Map
> operations. There is no way a map operation  can change sequence across a
> partition as partition is local and computation happens one record at a
> time.
>
> On 13-Sep-2017 9:54 PM, "Suzen, Mehmet"  wrote:
>
> I think the order has no meaning in RDDs see this post, specially zip
> methods:
> https://stackoverflow.com/questions/29268210/mind-blown-rdd-zip-method
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>
>


Re: RDD order preservation through transformations

2017-09-13 Thread Ankit Maloo
AFAIK, the order of a rdd is maintained across a partition for Map
operations. There is no way a map operation  can change sequence across a
partition as partition is local and computation happens one record at a
time.

On 13-Sep-2017 9:54 PM, "Suzen, Mehmet"  wrote:

I think the order has no meaning in RDDs see this post, specially zip
methods:
https://stackoverflow.com/questions/29268210/mind-blown-rdd-zip-method

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Re: Chaining Spark Streaming Jobs

2017-09-13 Thread Sunita Arvind
Thanks for your suggestion Vincent. Do not have much experience with akka
as such. I will explore this option.

On Tue, Sep 12, 2017 at 11:01 PM, vincent gromakowski <
vincent.gromakow...@gmail.com> wrote:

> What about chaining with akka or akka stream and the fair scheduler ?
>
> Le 13 sept. 2017 01:51, "Sunita Arvind"  a écrit :
>
> Hi Michael,
>
> I am wondering what I am doing wrong. I get error like:
>
> Exception in thread "main" java.lang.IllegalArgumentException: Schema
> must be specified when creating a streaming source DataFrame. If some files
> already exist in the directory, then depending on the file format you may
> be able to create a static DataFrame on that directory with
> 'spark.read.load(directory)' and infer schema from it.
> at org.apache.spark.sql.execution.datasources.DataSource.
> sourceSchema(DataSource.scala:223)
> at org.apache.spark.sql.execution.datasources.DataSource.
> sourceInfo$lzycompute(DataSource.scala:87)
> at org.apache.spark.sql.execution.datasources.DataSource.
> sourceInfo(DataSource.scala:87)
> at org.apache.spark.sql.execution.streaming.StreamingRelation$.
> apply(StreamingRelation.scala:30)
> at org.apache.spark.sql.streaming.DataStreamReader.load(
> DataStreamReader.scala:125)
> at org.apache.spark.sql.streaming.DataStreamReader.load(
> DataStreamReader.scala:134)
> at com.aol.persist.UplynkAggregates$.aggregator(UplynkAggregate
> s.scala:23)
> at com.aol.persist.UplynkAggregates$.main(UplynkAggregates.scala:41)
> at com.aol.persist.UplynkAggregates.main(UplynkAggregates.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
> ssorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
> thodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at com.intellij.rt.execution.application.AppMain.main(AppMain.
> java:144)
> 17/09/12 14:46:22 INFO SparkContext: Invoking stop() from shutdown hook
>
>
> I tried specifying the schema as well.
> Here is my code:
>
> object Aggregates {
>
>   val aggregation=
> """select sum(col1), sum(col2), id, first(name)
>   from enrichedtb
>   group by id
> """.stripMargin
>
>   def aggregator(conf:Config)={
> implicit val spark = 
> SparkSession.builder().appName(conf.getString("AppName")).getOrCreate()
> implicit val sqlctx = spark.sqlContext
> printf("Source path is" + conf.getString("source.path"))
> val schemadf = sqlctx.read.parquet(conf.getString("source.path")) // 
> Added this as it was complaining about schema.
> val df=spark.readStream.format("parquet").option("inferSchema", 
> true).schema(schemadf.schema).load(conf.getString("source.path"))
> df.createOrReplaceTempView("enrichedtb")
> val res = spark.sql(aggregation)
> 
> res.writeStream.format("parquet").outputMode("append").option("checkpointLocation",conf.getString("checkpointdir")).start(conf.getString("sink.outputpath"))
>   }
>
>   def main(args: Array[String]): Unit = {
> val mainconf = ConfigFactory.load()
> val conf = mainconf.getConfig(mainconf.getString("pipeline"))
> print(conf.toString)
> aggregator(conf)
>   }
>
> }
>
>
> I tried to extract schema from static read of the input path and provided it 
> to the readStream API. With that, I get this error:
>
> at 
> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
>   at 
> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:109)
>   at 
> org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232)
>   at 
> org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278)
>   at 
> org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:282)
>   at 
> org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:222)
>
> While running on the EMR cluster all paths point to S3. In my laptop, they 
> all point to local filesystem.
>
> I am using Spark2.2.0
>
> Appreciate your help.
>
> regards
>
> Sunita
>
>
> On Wed, Aug 23, 2017 at 2:30 PM, Michael Armbrust 
> wrote:
>
>> If you use structured streaming and the file sink, you can have a
>> subsequent stream read using the file source.  This will maintain exactly
>> once processing even if there are hiccups or failures.
>>
>> On Mon, Aug 21, 2017 at 2:02 PM, Sunita Arvind 
>> wrote:
>>
>>> Hello Spark Experts,
>>>
>>> I have a design question w.r.t Spark Streaming. I have a streaming job
>>> that consumes protocol buffer encoded real time logs from a Kafka cluster
>>> on premise. My spark application runs on EMR (aws) 

Re: RDD order preservation through transformations

2017-09-13 Thread Suzen, Mehmet
I think the order has no meaning in RDDs see this post, specially zip methods:
https://stackoverflow.com/questions/29268210/mind-blown-rdd-zip-method

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



RDD order preservation through transformations

2017-09-13 Thread johan.grande.ext
Hi,

I'm a beginner using Spark with Scala and I'm having trouble understanding 
ordering in RDDs. I understand that RDDs are ordered (as they can be sorted) 
but that some transformations don't preserve order.

How can I know which transformations preserve order and which don't? Regarding 
map, for instance, this StackOverflow answer says map preserves order but this 
answer on this ML implies it doesn't. The scaladoc doesn't say explicitely. 
Which is it?

https://stackoverflow.com/a/31525843
http://apache-spark-user-list.1001560.n3.nabble.com/rdd-ordering-gets-scrambled-tp5062p6482.html
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD

-- 
Johan Grande
Sopra Steria for Orange


_

Ce message et ses pieces jointes peuvent contenir des informations 
confidentielles ou privilegiees et ne doivent donc
pas etre diffuses, exploites ou copies sans autorisation. Si vous avez recu ce 
message par erreur, veuillez le signaler
a l'expediteur et le detruire ainsi que les pieces jointes. Les messages 
electroniques etant susceptibles d'alteration,
Orange decline toute responsabilite si ce message a ete altere, deforme ou 
falsifie. Merci.

This message and its attachments may contain confidential or privileged 
information that may be protected by law;
they should not be distributed, used or copied without authorisation.
If you have received this email in error, please notify the sender and delete 
this message and its attachments.
As emails may be altered, Orange is not liable for messages that have been 
modified, changed or falsified.
Thank you.


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Minimum cost flow problem solving in Spark

2017-09-13 Thread Michael Malak
You might be interested in "Maximum Flow implementation on Spark GraphX" done 
by a Colorado School of Mines grad student a couple of years ago.


http://datascienceassn.org/2016-01-27-maximum-flow-implementation-spark-graphx


From: Swapnil Shinde 
To: user@spark.apache.org; d...@spark.apache.org 
Sent: Wednesday, September 13, 2017 9:41 AM
Subject: Minimum cost flow problem solving in Spark



Hello
Has anyone used Spark to solve minimum cost flow problems in Spark? I am 
quite new to combinatorial optimization algorithms so any help or suggestions, 
libraries are very appreciated. 

Thanks
Swapnil

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Minimum cost flow problem solving in Spark

2017-09-13 Thread Swapnil Shinde
Hello
Has anyone used Spark to solve minimum cost flow problems in Spark? I
am quite new to combinatorial optimization algorithms so any help or
suggestions, libraries are very appreciated.

Thanks
Swapnil


HiveThriftserver does not seem to respect partitions

2017-09-13 Thread Yana Kadiyska
Hi folks, I have created a table in the following manner:

CREATE EXTERNAL TABLE IF NOT EXISTS  rum_beacon_partition (
  list of columns

)

COMMENT 'User Infomation'

PARTITIONED BY (account_id String,

product String,

group_id String,

year String,

month String,

day String)

STORED AS PARQUET

LOCATION '/stream2store/nt_tp_collation'


I then ran MSCK REPAIR TABLE to generate the partition information.


I think partitions got generated correctly -- here is a query and it's
output:


"show table extended like 'rum_beacon_partition'
partition(account_id='',product='rum',group_id='',year='2017',month='09',day='12')


 location:ceph://ceph-mon.service.consul:6789/stream2store/nt_tp_collation
/account_id=/product=rum/group_id= /year=2017/month=09/day=12

However, it does appear that when I issue a SQL query, the predicates do
not correctly limit the files touched:

explain extended select uri from rum_beacon_partition where
account_id='' and product='rum' and group_id='' and year='2017' and
month='09' limit 2

Produces output that seems to indicate that every file is being touched
(unless I'm misreading the output). It also crashes my filesystem so I
suspect there is some truth to it.

Optimized logical plan looks fine I think:

== Optimized Logical Plan == |

| Limit 2 |

| Project [uri#16519] |

| Filter (account_id#16511 = ) && (product#16512 = rum)) &&
(group_id#16513 = )) && (year#16514 = 2017)) && (month#16515 = 09)) |


But in the physical plan it seems that a ton of files are touched (both in
account and date partitions)

Scan
ParquetRelation[ceph://ceph-mon.service.consul:6789/stream2store/nt_tp_collation/account_id=/product=rum/group_id=/year=2017/month=05/day=16,ceph://ceph-mon.service.consul:6789/stream2store/nt_tp_collation/account_id=/product=rum/group_id=/year=2017/month=05/day=17,ceph://ceph-mon.service.consul:6789/stream2store/nt_tp_collation/account_id=/product=rum/group_id=/year=2017/month=05/day=18,ceph://ceph-mon.service.consul:6789/stream2store/nt_tp_collation/account_id=/product=rum/group_id=/year=2017/month=05/day=19,ceph://ceph-mon.service.consul:6789/stream2store/nt_tp_collation/account_id=/product=rum/group_id=/year=2017/month=05/day=20,ceph://ceph-mon.service.consul:6789/stream2store/nt_tp_collation/account_id=/product=rum/group_id=/year=2017/month=05/day=21,ceph://ceph-mon.service.consul:6789/stream2store/nt_tp_collation/account_id=/product=rum/group_id=/year=2017/month=05/day=22,ceph://ceph-mon.service.consul:6789/stream2store/nt_tp_collation/account_id=/product=rum/group_id=/year=2017/month=05/day=23,ceph://ceph-mon.service.consul:6789/stream2store/nt_tp_collation/account_id=/product=rum/group_id=/year=2017/month=05/day=24,ceph://ceph-mon.service.consul:6789/stream2store/nt_tp_collation/account_id=/product=rum/group_id=/year=2017/month=05/day=25,ceph://ceph-mon.service.consul:6789/stream2store/nt_tp_collation/account_id=/product=rum/group_id=/year=2017/month=05/day=26,ceph://ceph-mon.service.consul:6789/stream2store/nt_tp_collation/account_id=/product=rum/group_id=/year=2017/month=05/day=27,ceph://ceph-mon.service.consul:6789/stream2store/nt_tp_collation/account_id=/product=rum/group_id=/year=2017/month=05/day=28,ceph://ceph-mon.service.consul:6789/stream2store/nt_tp_collation/account_id=/product=rum/group_id=/year=2017/month=05/day=29,ceph://ceph-mon.service.consul:6789/stream2store/nt_tp_collation/account_id=/product=rum/group_id=/year=2017/month=05/day=30,ceph://ceph-mon.service.consul:6789/stream2store/nt_tp_collation/account_id=/product=rum/group_id=/year=2017/month=05/day=31,ceph://ceph-mon.service.consul:6789/stream2store/nt_tp_collation/account_id=/product=rum/group_id=/year=2017/month=06/day=01,ceph://ceph-mon.service.consul:6789/stream2store/nt_tp_collation/account_id=/product=rum/group_id=/year=2017/month=06/day=02


I am hoping someone can offer debugging tips / advice on what to look for
in the logs. I'm on a pretty old version of Spark (1.5.2) but this seems
like something that I'm doing wrong.


compile error: No classtag available while calling RDD.zip()

2017-09-13 Thread 沈志宏
Hello,Since Dataset has no zip(..) methods, so I wrote following code to zip 
two datasets: 

1   def zipDatasets[X: Encoder, Y: Encoder](spark: SparkSession, m: 
Dataset[X], n: Dataset[Y]) = {
2   val rdd = m.rdd.zip(n.rdd);
3   import spark.implicits._
4   spark.createDataset(rdd);
5   }

However, in the m.rdd.zip(…) call, compile error is reported:   No ClassTag 
available for Y

I know this error can be corrected when I declare Y as a ClassTag like this:

1   def foo[X: Encoder, Y: ClassTag](spark: SparkSession, …

But this will make line 5 report a new error:
Unable to find encoder for type stored in a Dataset.

Now, I have no idea to solve this problem. How to declared Y as both an Encoder 
and a ClassTag?

Many thanks!

Best regards,
bluejoe
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[Structured Streaming] Multiple sources best practice/recommendation

2017-09-13 Thread JG Perrin
Hi,

I have different files being dumped on S3, I want to ingest them and join them.

What does sound better to you? Have one " directory" for all or one per file 
format?

If I have one directory for all, can you get some metadata about the file, like 
its name?

If multiple directory, how can I have multiple "listeners"?

Thanks

jg

__
This electronic transmission and any documents accompanying this electronic 
transmission contain confidential information belonging to the sender.  This 
information may contain confidential health information that is legally 
privileged.  The information is intended only for the use of the individual or 
entity named above.  The authorized recipient of this transmission is 
prohibited from disclosing this information to any other party unless required 
to do so by law or regulation and is required to delete or destroy the 
information after its stated need has been fulfilled.  If you are not the 
intended recipient, you are hereby notified that any disclosure, copying, 
distribution or the taking of any action in reliance on or regarding the 
contents of this electronically transmitted information is strictly prohibited. 
 If you have received this E-mail in error, please notify the sender and delete 
this message immediately.


[Spark Dataframe] How can I write a correct filter so the Hive table partitions are pruned correctly

2017-09-13 Thread Patrick Duin
Hi Spark users,

I've got an issue where I wrote a filter on a Hive table using dataframes
and despite setting:
spark.sql.hive.metastorePartitionPruning=true no partitions are being
pruned.

In short:

Doing this: table.filter("partition=x or partition=y") will result in Spark
fetching all partition metadata from the Hive metastore and doing the
filtering after fetching the partitions.

On the other hand if my filter is "simple":
table.filter("partition=x ")
Spark does a call to the metastore that passes along the filter and fetches
just the ones it needs.

Our case is where we have a lot of partitions on a table and the calls that
result in all the partitions take minutes as well as causing us memory
issues. Is this a bug or is there a better way of doing the filter call?

Thanks,
 Patrick

PS:
Sorry for crossposting I wasn't sure if the user list was the correct place
to ask and I understood to go via stackoverflow first so my question is
also here in more detail:
https://stackoverflow.com/questions/46152526/how-should-i-configure-spark-to-correctly-prune-hive-metastore-partitions


Re: Chaining Spark Streaming Jobs

2017-09-13 Thread vincent gromakowski
What about chaining with akka or akka stream and the fair scheduler ?

Le 13 sept. 2017 01:51, "Sunita Arvind"  a écrit :

Hi Michael,

I am wondering what I am doing wrong. I get error like:

Exception in thread "main" java.lang.IllegalArgumentException: Schema must
be specified when creating a streaming source DataFrame. If some files
already exist in the directory, then depending on the file format you may
be able to create a static DataFrame on that directory with
'spark.read.load(directory)' and infer schema from it.
at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(
DataSource.scala:223)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$
lzycompute(DataSource.scala:87)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(
DataSource.scala:87)
at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(
StreamingRelation.scala:30)
at org.apache.spark.sql.streaming.DataStreamReader.
load(DataStreamReader.scala:125)
at org.apache.spark.sql.streaming.DataStreamReader.
load(DataStreamReader.scala:134)
at com.aol.persist.UplynkAggregates$.aggregator(
UplynkAggregates.scala:23)
at com.aol.persist.UplynkAggregates$.main(UplynkAggregates.scala:41)
at com.aol.persist.UplynkAggregates.main(UplynkAggregates.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(
NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(
DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
17/09/12 14:46:22 INFO SparkContext: Invoking stop() from shutdown hook


I tried specifying the schema as well.
Here is my code:

object Aggregates {

  val aggregation=
"""select sum(col1), sum(col2), id, first(name)
  from enrichedtb
  group by id
""".stripMargin

  def aggregator(conf:Config)={
implicit val spark =
SparkSession.builder().appName(conf.getString("AppName")).getOrCreate()
implicit val sqlctx = spark.sqlContext
printf("Source path is" + conf.getString("source.path"))
val schemadf = sqlctx.read.parquet(conf.getString("source.path"))
// Added this as it was complaining about schema.
val df=spark.readStream.format("parquet").option("inferSchema",
true).schema(schemadf.schema).load(conf.getString("source.path"))
df.createOrReplaceTempView("enrichedtb")
val res = spark.sql(aggregation)

res.writeStream.format("parquet").outputMode("append").option("checkpointLocation",conf.getString("checkpointdir")).start(conf.getString("sink.outputpath"))
  }

  def main(args: Array[String]): Unit = {
val mainconf = ConfigFactory.load()
val conf = mainconf.getConfig(mainconf.getString("pipeline"))
print(conf.toString)
aggregator(conf)
  }

}


I tried to extract schema from static read of the input path and
provided it to the readStream API. With that, I get this error:

at 
org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
at 
org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:109)
at 
org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232)
at 
org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278)
at 
org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:282)
at 
org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:222)

While running on the EMR cluster all paths point to S3. In my laptop,
they all point to local filesystem.

I am using Spark2.2.0

Appreciate your help.

regards

Sunita


On Wed, Aug 23, 2017 at 2:30 PM, Michael Armbrust 
wrote:

> If you use structured streaming and the file sink, you can have a
> subsequent stream read using the file source.  This will maintain exactly
> once processing even if there are hiccups or failures.
>
> On Mon, Aug 21, 2017 at 2:02 PM, Sunita Arvind 
> wrote:
>
>> Hello Spark Experts,
>>
>> I have a design question w.r.t Spark Streaming. I have a streaming job
>> that consumes protocol buffer encoded real time logs from a Kafka cluster
>> on premise. My spark application runs on EMR (aws) and persists data onto
>> s3. Before I persist, I need to strip header and convert protobuffer to
>> parquet (I use sparksql-scalapb to convert from Protobuff to
>> Spark.sql.Row). I need to persist Raw logs as is. I can continue the
>> enrichment on the same dataframe after persisting the raw data, however, in
>> order to modularize I am planning to have a separate job which picks up the
>> raw data and