Re: Union of DStream and RDD

2017-02-11 Thread Amit Sela
Not specifically, I want to generally be able to union any form of
DStream/RDD. I'm working on Apache Beam's Spark runner so the abstraction
their does not tell between streaming/batch (kinda like Dataset API).
Since I wrote my own InputDStream I will simply stream any "batch source"
instead, because I really don't see a way to union both.

On Sun, Feb 12, 2017 at 6:49 AM Egor Pahomov  wrote:

> Interestingly, I just faced with the same problem. By any change, do you
> want to process old files in the directory as well as new ones? It's my
> motivation and checkpointing my problem as well.
>
> 2017-02-08 22:02 GMT-08:00 Amit Sela :
>
> Not with checkpointing.
>
> On Thu, Feb 9, 2017, 04:58 Egor Pahomov  wrote:
>
> Just guessing here, but would
> http://spark.apache.org/docs/latest/streaming-programming-guide.html#basic-sources
> "*Queue of RDDs as a Stream*" work? Basically create DStream from your
> RDD and than union with other DStream.
>
> 2017-02-08 12:32 GMT-08:00 Amit Sela :
>
> Hi all,
>
> I'm looking to union a DStream and RDD into a single stream.
> One important note is that the RDD has to be added to the DStream just
> once.
>
> Ideas ?
>
> Thanks,
> Amit
>
>
>
>
>
> --
>
>
> *Sincerely yoursEgor Pakhomov*
>
>
>
>
> --
>
>
> *Sincerely yoursEgor Pakhomov*
>


Re: Union of DStream and RDD

2017-02-11 Thread Egor Pahomov
Interestingly, I just faced with the same problem. By any change, do you
want to process old files in the directory as well as new ones? It's my
motivation and checkpointing my problem as well.

2017-02-08 22:02 GMT-08:00 Amit Sela :

> Not with checkpointing.
>
> On Thu, Feb 9, 2017, 04:58 Egor Pahomov  wrote:
>
>> Just guessing here, but would http://spark.apache.org/
>> docs/latest/streaming-programming-guide.html#basic-sources "*Queue of
>> RDDs as a Stream*" work? Basically create DStream from your RDD and than
>> union with other DStream.
>>
>> 2017-02-08 12:32 GMT-08:00 Amit Sela :
>>
>> Hi all,
>>
>> I'm looking to union a DStream and RDD into a single stream.
>> One important note is that the RDD has to be added to the DStream just
>> once.
>>
>> Ideas ?
>>
>> Thanks,
>> Amit
>>
>>
>>
>>
>>
>> --
>>
>>
>> *Sincerely yoursEgor Pakhomov*
>>
>


-- 


*Sincerely yoursEgor Pakhomov*


Re: [Structured Streaming] Using File Sink to store to hive table.

2017-02-11 Thread Egor Pahomov
Got it, thanks!

2017-02-11 0:56 GMT-08:00 Sam Elamin :

> Here's a link to the thread
>
> http://apache-spark-developers-list.1001551.n3.nabble.com/Structured-
> Streaming-Dropping-Duplicates-td20884.html
>
> On Sat, 11 Feb 2017 at 08:47, Sam Elamin  wrote:
>
>> Hey Egor
>>
>>
>> You can use for each writer or you can write a custom sink. I personally
>> went with a custom sink since I get a dataframe per batch
>>
>> https://github.com/samelamin/spark-bigquery/blob/master/
>> src/main/scala/com/samelamin/spark/bigquery/streaming/BigQuerySink.scala
>>
>> You can have a look at how I implemented something similar to file sink
>> that in the event if a failure skips batches already written
>>
>>
>> Also have a look at Micheals reply to me a few days ago on exactly the
>> same topic. The email subject was called structured streaming. Dropping
>> duplicates
>>
>>
>> Regards
>>
>> Sam
>>
>> On Sat, 11 Feb 2017 at 07:59, Jacek Laskowski  wrote:
>>
>> "Something like that" I've never tried it out myself so I'm only
>> guessing having a brief look at the API.
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://medium.com/@jaceklaskowski/
>> Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
>> Follow me at https://twitter.com/jaceklaskowski
>>
>>
>> On Sat, Feb 11, 2017 at 1:31 AM, Egor Pahomov 
>> wrote:
>> > Jacek, so I create cache in ForeachWriter, in all "process()" I write
>> to it
>> > and on close I flush? Something like that?
>> >
>> > 2017-02-09 12:42 GMT-08:00 Jacek Laskowski :
>> >>
>> >> Hi,
>> >>
>> >> Yes, that's ForeachWriter.
>> >>
>> >> Yes, it works with element by element. You're looking for mapPartition
>> >> and ForeachWriter has partitionId that you could use to implement a
>> >> similar thing.
>> >>
>> >> Pozdrawiam,
>> >> Jacek Laskowski
>> >> 
>> >> https://medium.com/@jaceklaskowski/
>> >> Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
>> >> Follow me at https://twitter.com/jaceklaskowski
>> >>
>> >>
>> >> On Thu, Feb 9, 2017 at 3:55 AM, Egor Pahomov 
>> >> wrote:
>> >> > Jacek, you mean
>> >> >
>> >> > http://spark.apache.org/docs/latest/api/scala/index.html#
>> org.apache.spark.sql.ForeachWriter
>> >> > ? I do not understand how to use it, since it passes every value
>> >> > separately,
>> >> > not every partition. And addding to table value by value would not
>> work
>> >> >
>> >> > 2017-02-07 12:10 GMT-08:00 Jacek Laskowski :
>> >> >>
>> >> >> Hi,
>> >> >>
>> >> >> Have you considered foreach sink?
>> >> >>
>> >> >> Jacek
>> >> >>
>> >> >> On 6 Feb 2017 8:39 p.m., "Egor Pahomov" 
>> wrote:
>> >> >>>
>> >> >>> Hi, I'm thinking of using Structured Streaming instead of old
>> >> >>> streaming,
>> >> >>> but I need to be able to save results to Hive table. Documentation
>> for
>> >> >>> file
>> >> >>> sink
>> >> >>>
>> >> >>> says(http://spark.apache.org/docs/latest/structured-
>> streaming-programming-guide.html#output-sinks):
>> >> >>> "Supports writes to partitioned tables. ". But being able to write
>> to
>> >> >>> partitioned directories is not enough to write to the table:
>> someone
>> >> >>> needs
>> >> >>> to write to Hive metastore. How can I use Structured Streaming and
>> >> >>> write to
>> >> >>> Hive table?
>> >> >>>
>> >> >>> --
>> >> >>> Sincerely yours
>> >> >>> Egor Pakhomov
>> >> >
>> >> >
>> >> >
>> >> >
>> >> > --
>> >> > Sincerely yours
>> >> > Egor Pakhomov
>> >
>> >
>> >
>> >
>> > --
>> > Sincerely yours
>> > Egor Pakhomov
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


-- 


*Sincerely yoursEgor Pakhomov*


Remove dependence on HDFS

2017-02-11 Thread Benjamin Kim
Has anyone got some advice on how to remove the reliance on HDFS for storing 
persistent data. We have an on-premise Spark cluster. It seems like a waste of 
resources to keep adding nodes because of a lack of storage space only. I would 
rather add more powerful nodes due to the lack of processing power at a less 
frequent rate, than add less powerful nodes at a more frequent rate just to 
handle the ever growing data. Can anyone point me in the right direction? Is 
Alluxio a good solution? S3? I would like to hear your thoughts.

Cheers,
Ben 
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Practical configuration to run LSH in Spark 2.1.0

2017-02-11 Thread Timur Shenkao
Hello,

1) Are you sure that your data is "clean"?  No unexpected missing values?
No strings in unusual encoding? No additional or missing columns ?
2) How long does your job run? What about garbage collector parameters?
Have you checked what happens with jconsole / jvisualvm ?

Sincerely yours, Timur

On Sat, Feb 11, 2017 at 12:52 AM, nguyen duc Tuan 
wrote:

> Hi Nick,
> Because we use *RandomSignProjectionLSH*, there is only one parameter for
> LSH is the number of hashes. I try with small number of hashes (2) but the
> error is still happens. And it happens when I call similarity join. After
> transformation, the size of  dataset is about 4G.
>
> 2017-02-11 3:07 GMT+07:00 Nick Pentreath :
>
>> What other params are you using for the lsh transformer?
>>
>> Are the issues occurring during transform or during the similarity join?
>>
>>
>> On Fri, 10 Feb 2017 at 05:46, nguyen duc Tuan 
>> wrote:
>>
>>> hi Das,
>>> In general, I will apply them to larger datasets, so I want to use LSH,
>>> which is more scaleable than the approaches as you suggested. Have you
>>> tried LSH in Spark 2.1.0 before ? If yes, how do you set the
>>> parameters/configuration to make it work ?
>>> Thanks.
>>>
>>> 2017-02-10 19:21 GMT+07:00 Debasish Das :
>>>
>>> If it is 7m rows and 700k features (or say 1m features) brute force row
>>> similarity will run fine as well...check out spark-4823...you can compare
>>> quality with approximate variant...
>>> On Feb 9, 2017 2:55 AM, "nguyen duc Tuan"  wrote:
>>>
>>> Hi everyone,
>>> Since spark 2.1.0 introduces LSH (http://spark.apache.org/docs/
>>> latest/ml-features.html#locality-sensitive-hashing), we want to use LSH
>>> to find approximately nearest neighbors. Basically, We have dataset with
>>> about 7M rows. we want to use cosine distance to meassure the similarity
>>> between items, so we use *RandomSignProjectionLSH* (
>>> https://gist.github.com/tuan3w/c968e56ea8ef135096eeedb08af097db)
>>> instead of *BucketedRandomProjectionLSH*. I try to tune some
>>> configurations such as serialization, memory fraction, executor memory
>>> (~6G), number of executors ( ~20), memory overhead ..., but nothing works.
>>> I often get error "java.lang.OutOfMemoryError: Java heap space" while
>>> running. I know that this implementation is done by engineer at Uber but I
>>> don't know right configurations,.. to run the algorithm at scale. Do they
>>> need very big memory to run it?
>>>
>>> Any help would be appreciated.
>>> Thanks
>>>
>>>
>>>
>


Re: Strange behavior with 'not' and filter pushdown

2017-02-11 Thread Everett Anderson
On the plus side, looks like this may be fixed in 2.1.0:

== Physical Plan ==
*HashAggregate(keys=[], functions=[count(1)])
+- Exchange SinglePartition
   +- *HashAggregate(keys=[], functions=[partial_count(1)])
  +- *Project
 +- *Filter NOT isnotnull(username#14)
+- *FileScan parquet [username#14] Batched: true, Format:
Parquet, Location: InMemoryFileIndex[file:/tmp/test_table],
PartitionFilters: [], PushedFilters: [Not(IsNotNull(username))],
ReadSchema: struct



On Fri, Feb 10, 2017 at 11:26 AM, Everett Anderson  wrote:

> Bumping this thread.
>
> Translating "where not(username is not null)" into a filter of  
> [IsNotNull(username),
> Not(IsNotNull(username))] seems like a rather severe bug.
>
> Spark 1.6.2:
>
> explain select count(*) from parquet_table where not( username is not null)
>
> == Physical Plan ==
> TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)],
> output=[_c0#1822L])
> +- TungstenExchange SinglePartition, None
>  +- TungstenAggregate(key=[], 
> functions=[(count(1),mode=Partial,isDistinct=false)],
> output=[count#1825L])
>  +- Project
>  +- Filter NOT isnotnull(username#1590)
>  +- Scan ParquetRelation[username#1590] InputPaths: ,
> PushedFilters: [Not(IsNotNull(username))]
>
> Spark 2.0.2
>
> explain select count(*) from parquet_table where not( username is not null)
>
> == Physical Plan ==
> *HashAggregate(keys=[], functions=[count(1)])
> +- Exchange SinglePartition
>  +- *HashAggregate(keys=[], functions=[partial_count(1)])
>  +- *Project
>  +- *Filter (isnotnull(username#35) && NOT isnotnull(username#35))
>  +- *BatchedScan parquet default.[username#35] Format:
> ParquetFormat, InputPaths: , PartitionFilters: [],
> PushedFilters: [IsNotNull(username), Not(IsNotNull(username))],
> ReadSchema: struct
>
> Example to generate the above:
>
> // Create some fake data
>
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.Dataset
> import org.apache.spark.sql.types._
>
> val rowsRDD = sc.parallelize(Seq(
> Row(1, "fred"),
> Row(2, "amy"),
> Row(3, null)))
>
> val schema = StructType(Seq(
> StructField("id", IntegerType, nullable = true),
> StructField("username", StringType, nullable = true)))
>
> val data = sqlContext.createDataFrame(rowsRDD, schema)
>
> val path = "SOME PATH HERE"
>
> data.write.mode("overwrite").parquet(path)
>
> val testData = sqlContext.read.parquet(path)
>
> testData.registerTempTable("filter_test_table")
>
>
> %sql
> explain select count(*) from filter_test_table where not( username is not
> null)
>
>
> On Wed, Feb 8, 2017 at 4:56 PM, Alexi Kostibas  > wrote:
>
>> Hi,
>>
>> I have an application where I’m filtering data with SparkSQL with simple
>> WHERE clauses. I also want the ability to show the unmatched rows for any
>> filter, and so am wrapping the previous clause in `NOT()` to get the
>> inverse. Example:
>>
>> Filter:  username is not null
>> Inverse filter:  NOT(username is not null)
>>
>> This worked fine in Spark 1.6. After upgrading to Spark 2.0.2, the
>> inverse filter always returns zero results. It looks like this is a problem
>> with how the filter is getting pushed down to Parquet. Specifically, the
>> pushdown includes both the “is not null” filter, AND “not(is not null)”,
>> which would obviously result in zero matches. An example below:
>>
>> pyspark:
>> > x = spark.sql('select my_id from my_table where *username is not null*
>> ')
>> > y = spark.sql('select my_id from my_table where not(*username is not
>> null*)')
>>
>> > x.explain()
>> == Physical Plan ==
>> *Project [my_id#6L]
>> +- *Filter isnotnull(username#91)
>>+- *BatchedScan parquet default.my_table[my_id#6L,username#91]
>>Format: ParquetFormat, InputPaths: s3://my-path/my.parquet,
>>PartitionFilters: [], PushedFilters: [IsNotNull(username)],
>>ReadSchema: struct
>> [1159]> y.explain()
>> == Physical Plan ==
>> *Project [my_id#6L]
>> +- *Filter (isnotnull(username#91) && NOT isnotnull(username#91))username
>>+- *BatchedScan parquet default.my_table[my_id#6L,username#91]
>>Format: ParquetFormat, InputPaths: s3://my-path/my.parquet,
>>PartitionFilters: [],
>>PushedFilters: [IsNotNull(username), Not(IsNotNull(username))],user
>> name
>>ReadSchema: struct
>>
>> Presently I’m working around this by using the new functionality of NOT
>> EXISTS in Spark 2, but that seems like overkill.
>>
>> Any help appreciated.
>>
>>
>> *Alexi Kostibas*Engineering
>> *Nuna*
>> 650 Townsend Street, Suite 425
>> San Francisco, CA 94103
>>
>>
>


Case class with POJO - encoder issues

2017-02-11 Thread Jason White
I'd like to create a Dataset using some classes from Geotools to do some
geospatial analysis. In particular, I'm trying to use Spark to distribute
the work based on ID and label fields that I extract from the polygon data.

My simplified case class looks like this:
implicit val geometryEncoder: Encoder[Geometry] = Encoders.kryo[Geometry]
case class IndexedGeometry(label: String, tract: Geometry)

When I try to create a dataset using this case class, it give me this error
message:
Exception in thread "main" java.lang.UnsupportedOperationException: No
Encoder found for com.vividsolutions.jts.geom.Geometry
- field (class: "com.vividsolutions.jts.geom.Geometry", name: "tract")
- root class: "org.me.HelloWorld.IndexedGeometry"

If I add another encoder for my case class...:
implicit val indexedGeometryEncoder: Encoder[IndexedGeometry] =
Encoders.kryo[IndexedGeometry]

...it works, but now the entire dataset has a single field, "value", and
it's a binary blob.

Is there a way to do what I'm trying to do?
I believe this PR is related, but it's been idle since December:
https://github.com/apache/spark/pull/15918




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Case-class-with-POJO-encoder-issues-tp28381.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: wholeTextFiles fails, but textFile succeeds for same path

2017-02-11 Thread Henry Tremblay

51,000 files at about 1/2 MB per file. I am wondering if I need this

http://docs.aws.amazon.com/emr/latest/ReleaseGuide/UsingEMR_s3distcp.html

Although if I am understanding you correctly, even if I copy the S3 
files to HDFS on EMR, and use wholeTextFiles, I am still only going to 
be able to use a single executor?


Henry

On 02/11/2017 01:03 PM, Jörn Franke wrote:
Can you post more information about the number of files, their size 
and the executor logs.


A gzipped file is not splittable i.e. Only one executor can gunzip it 
(the unzipped data can then be processed in parallel).
Wholetextfile was designed to be executed only on one executor (e.g. 
For processing xmls which are difficult to process in parallel).


Then, if you have small files (< HDFS blocksize) they are also only 
processed on one executor by default.


You may repartition though for parallel processing in even those cases.

On 11 Feb 2017, at 21:40, Paul Tremblay > wrote:


I've been working on this problem for several days (I am doing more 
to increase my knowledge of Spark). The code you linked to hangs 
because after reading in the file, I have to gunzip it.


Another way that seems to be working is reading each file in using 
sc.textFile, and then writing it the HDFS, and then using 
wholeTextFiles for the HDFS result.


But the bigger issue is that both methods are not executed in 
parallel. When I open my yarn manager, it shows that only one node is 
being used.



Henry


On 02/06/2017 03:39 PM, Jon Gregg wrote:
Strange that it's working for some directories but not others.  
Looks like wholeTextFiles maybe doesn't work with S3? 
https://issues.apache.org/jira/browse/SPARK-4414 .


If it's possible to load the data into EMR and run Spark from there 
that may be a workaround.  This blogspot shows a python workaround 
that might work as well: 
http://michaelryanbell.com/processing-whole-files-spark-s3.html


Jon


On Mon, Feb 6, 2017 at 6:38 PM, Paul Tremblay 
> wrote:


I've actually been able to trace the problem to the files being
read in. If I change to a different directory, then I don't get
the error. Is one of the executors running out of memory?





On 02/06/2017 02:35 PM, Paul Tremblay wrote:

When I try to create an rdd using wholeTextFiles, I get an
incomprehensible error. But when I use the same path with
sc.textFile, I get no error.

I am using pyspark with spark 2.1.

in_path =

's3://commoncrawl/crawl-data/CC-MAIN-2016-50/segments/1480698542939.6/warc/

rdd = sc.wholeTextFiles(in_path)

rdd.take(1)


/usr/lib/spark/python/pyspark/rdd.py in take(self, num)
   1341
   1342 p = range(partsScanned, min(partsScanned
+ numPartsToTry, totalParts))
-> 1343 res = self.context.runJob(self,
takeUpToNumLeft, p)
   1344
   1345 items += res

/usr/lib/spark/python/pyspark/context.py in runJob(self,
rdd, partitionFunc, partitions, allowLocal)
963 # SparkContext#runJob.
964 mappedRDD = rdd.mapPartitions(partitionFunc)
--> 965 port =
self._jvm.PythonRDD.runJob(self._jsc.sc (),
mappedRDD._jrdd, partitions)
966 return list(_load_from_socket(port,
mappedRDD._jrdd_deserializer))
967

/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py
in __call__(self, *args)
   1131 answer =
self.gateway_client.send_command(command)
   1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client,
self.target_id, self.name )
   1134
   1135 for temp_arg in temp_args:

/usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
 61 def deco(*a, **kw):
 62 try:
---> 63 return f(*a, **kw)
 64 except py4j.protocol.Py4JJavaError as e:
 65 s = e.java_exception.toString()

/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py
in get_return_value(answer, gateway_client, target_id, name)
317 raise Py4JJavaError(
318 "An error occurred while calling
{0}{1}{2}.\n".
--> 319 format(target_id, ".", name), value)
320 else:
321 raise Py4JError(

Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage
failure: Task 0 in stage 1.0 failed 4 times, most recent
failure: Lost 

Re: wholeTextFiles fails, but textFile succeeds for same path

2017-02-11 Thread Jörn Franke
Can you post more information about the number of files, their size and the 
executor logs.

A gzipped file is not splittable i.e. Only one executor can gunzip it (the 
unzipped data can then be processed in parallel). 
Wholetextfile was designed to be executed only on one executor (e.g. For 
processing xmls which are difficult to process in parallel).

Then, if you have small files (< HDFS blocksize) they are also only processed 
on one executor by default.

You may repartition though for parallel processing in even those cases.

> On 11 Feb 2017, at 21:40, Paul Tremblay  wrote:
> 
> I've been working on this problem for several days (I am doing more to 
> increase my knowledge of Spark). The code you linked to hangs because after 
> reading in the file, I have to gunzip it. 
> Another way that seems to be working is reading each file in using 
> sc.textFile, and then writing it the HDFS, and then using wholeTextFiles for 
> the HDFS result. 
> But the bigger issue is that both methods are not executed in parallel. When 
> I open my yarn manager, it shows that only one node is being used. 
> 
> Henry
> 
>> On 02/06/2017 03:39 PM, Jon Gregg wrote:
>> Strange that it's working for some directories but not others.  Looks like 
>> wholeTextFiles maybe doesn't work with S3?  
>> https://issues.apache.org/jira/browse/SPARK-4414 .  
>> 
>> If it's possible to load the data into EMR and run Spark from there that may 
>> be a workaround.  This blogspot shows a python workaround that might work as 
>> well: http://michaelryanbell.com/processing-whole-files-spark-s3.html
>> 
>> Jon
>> 
>> 
>> On Mon, Feb 6, 2017 at 6:38 PM, Paul Tremblay  
>> wrote:
>>> I've actually been able to trace the problem to the files being read in. If 
>>> I change to a different directory, then I don't get the error. Is one of 
>>> the executors running out of memory?
>>> 
>>> 
>>> 
>>> 
>>> 
 On 02/06/2017 02:35 PM, Paul Tremblay wrote:
 When I try to create an rdd using wholeTextFiles, I get an 
 incomprehensible error. But when I use the same path with sc.textFile, I 
 get no error.
 
 I am using pyspark with spark 2.1.
 
 in_path = 
 's3://commoncrawl/crawl-data/CC-MAIN-2016-50/segments/1480698542939.6/warc/
 
 rdd = sc.wholeTextFiles(in_path)
 
 rdd.take(1)
 
 
 /usr/lib/spark/python/pyspark/rdd.py in take(self, num)
1341
1342 p = range(partsScanned, min(partsScanned + 
 numPartsToTry, totalParts))
 -> 1343 res = self.context.runJob(self, takeUpToNumLeft, p)
1344
1345 items += res
 
 /usr/lib/spark/python/pyspark/context.py in runJob(self, rdd, 
 partitionFunc, partitions, allowLocal)
 963 # SparkContext#runJob.
 964 mappedRDD = rdd.mapPartitions(partitionFunc)
 --> 965 port = self._jvm.PythonRDD.runJob(self._jsc.sc(), 
 mappedRDD._jrdd, partitions)
 966 return list(_load_from_socket(port, 
 mappedRDD._jrdd_deserializer))
 967
 
 /usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in 
 __call__(self, *args)
1131 answer = self.gateway_client.send_command(command)
1132 return_value = get_return_value(
 -> 1133 answer, self.gateway_client, self.target_id, self.name)
1134
1135 for temp_arg in temp_args:
 
 /usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
  61 def deco(*a, **kw):
  62 try:
 ---> 63 return f(*a, **kw)
  64 except py4j.protocol.Py4JJavaError as e:
  65 s = e.java_exception.toString()
 
 /usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in 
 get_return_value(answer, gateway_client, target_id, name)
 317 raise Py4JJavaError(
 318 "An error occurred while calling {0}{1}{2}.\n".
 --> 319 format(target_id, ".", name), value)
 320 else:
 321 raise Py4JError(
 
 Py4JJavaError: An error occurred while calling 
 z:org.apache.spark.api.python.PythonRDD.runJob.
 : org.apache.spark.SparkException: Job aborted due to stage failure: Task 
 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 
 1.0 (TID 7, ip-172-31-45-114.us-west-2.compute.internal, executor 8): 
 ExecutorLostFailure (executor 8 exited caused by one of the running tasks) 
 Reason: Container marked as failed: container_1486415078210_0005_01_16 
 on host: ip-172-31-45-114.us-west-2.compute.internal. Exit status: 52. 
 Diagnostics: Exception from container-launch.
 Container id: container_1486415078210_0005_01_16
 Exit code: 52
 Stack trace: ExitCodeException exitCode=52:
 

Re: wholeTextFiles fails, but textFile succeeds for same path

2017-02-11 Thread Paul Tremblay
I've been working on this problem for several days (I am doing more to 
increase my knowledge of Spark). The code you linked to hangs because 
after reading in the file, I have to gunzip it.


Another way that seems to be working is reading each file in using 
sc.textFile, and then writing it the HDFS, and then using wholeTextFiles 
for the HDFS result.


But the bigger issue is that both methods are not executed in parallel. 
When I open my yarn manager, it shows that only one node is being used.



Henry


On 02/06/2017 03:39 PM, Jon Gregg wrote:
Strange that it's working for some directories but not others.  Looks 
like wholeTextFiles maybe doesn't work with S3? 
https://issues.apache.org/jira/browse/SPARK-4414 .


If it's possible to load the data into EMR and run Spark from there 
that may be a workaround.  This blogspot shows a python workaround 
that might work as well: 
http://michaelryanbell.com/processing-whole-files-spark-s3.html


Jon


On Mon, Feb 6, 2017 at 6:38 PM, Paul Tremblay > wrote:


I've actually been able to trace the problem to the files being
read in. If I change to a different directory, then I don't get
the error. Is one of the executors running out of memory?





On 02/06/2017 02:35 PM, Paul Tremblay wrote:

When I try to create an rdd using wholeTextFiles, I get an
incomprehensible error. But when I use the same path with
sc.textFile, I get no error.

I am using pyspark with spark 2.1.

in_path =

's3://commoncrawl/crawl-data/CC-MAIN-2016-50/segments/1480698542939.6/warc/

rdd = sc.wholeTextFiles(in_path)

rdd.take(1)


/usr/lib/spark/python/pyspark/rdd.py in take(self, num)
   1341
   1342 p = range(partsScanned, min(partsScanned +
numPartsToTry, totalParts))
-> 1343 res = self.context.runJob(self,
takeUpToNumLeft, p)
   1344
   1345 items += res

/usr/lib/spark/python/pyspark/context.py in runJob(self, rdd,
partitionFunc, partitions, allowLocal)
963 # SparkContext#runJob.
964 mappedRDD = rdd.mapPartitions(partitionFunc)
--> 965 port = self._jvm.PythonRDD.runJob(self._jsc.sc
(), mappedRDD._jrdd, partitions)
966 return list(_load_from_socket(port,
mappedRDD._jrdd_deserializer))
967

/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py
in __call__(self, *args)
   1131 answer = self.gateway_client.send_command(command)
   1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client,
self.target_id, self.name )
   1134
   1135 for temp_arg in temp_args:

/usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
 61 def deco(*a, **kw):
 62 try:
---> 63 return f(*a, **kw)
 64 except py4j.protocol.Py4JJavaError as e:
 65 s = e.java_exception.toString()

/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py
in get_return_value(answer, gateway_client, target_id, name)
317 raise Py4JJavaError(
318 "An error occurred while calling
{0}{1}{2}.\n".
--> 319 format(target_id, ".", name), value)
320 else:
321 raise Py4JError(

Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage
failure: Task 0 in stage 1.0 failed 4 times, most recent
failure: Lost task 0.3 in stage 1.0 (TID 7,
ip-172-31-45-114.us-west-2.com
pute.internal, executor
8): ExecutorLostFailure (executor 8 exited caused by one of
the running tasks) Reason: Container marked as failed:
container_1486415078210_0005_01_16 on host:
ip-172-31-45-114.us-west-2.com
pute.internal. Exit
status: 52. Diagnostics: Exception from container-launch.
Container id: container_1486415078210_0005_01_16
Exit code: 52
Stack trace: ExitCodeException exitCode=52:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:582)
at org.apache.hadoop.util.Shell.run(Shell.java:479)
at

org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773)
at

org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212)
at


Re: Turning rows into columns

2017-02-11 Thread Paul Tremblay

Yes, that's what I need. Thanks.


P.


On 02/05/2017 12:17 PM, Koert Kuipers wrote:
since there is no key to group by and assemble records i would suggest 
to write this in RDD land and then convert to data frame. you can use 
sc.wholeTextFiles to process text files and create a state machine


On Feb 4, 2017 16:25, "Paul Tremblay" > wrote:


I am using pyspark 2.1 and am wondering how to convert a flat
file, with one record per row, into a columnar format.

Here is an example of the data:

u'WARC/1.0',
 u'WARC-Type: warcinfo',
 u'WARC-Date: 2016-12-08T13:00:23Z',
 u'WARC-Record-ID: ',
 u'Content-Length: 344',
 u'Content-Type: application/warc-fields',
 u'WARC-Filename:
CC-MAIN-20161202170900-0-ip-10-31-129-80.ec2.internal.warc.gz',
 u'',
 u'robots: classic',
 u'hostname: ip-10-31-129-80.ec2.internal',
 u'software: Nutch 1.6 (CC)/CC WarcExport 1.0',
 u'isPartOf: CC-MAIN-2016-50',
 u'operator: CommonCrawl Admin',
 u'description: Wide crawl of the web for November 2016',
 u'publisher: CommonCrawl',
 u'format: WARC File Format 1.0',
 u'conformsTo:
http://bibnum.bnf.fr/WARC/WARC_ISO_28500_version1_latestdraft.pdf
',
 u'',
 u'',
 u'WARC/1.0',
 u'WARC-Type: request',
 u'WARC-Date: 2016-12-02T17:54:09Z',
 u'WARC-Record-ID: ',
 u'Content-Length: 220',
 u'Content-Type: application/http; msgtype=request',
 u'WARC-Warcinfo-ID: ',
 u'WARC-IP-Address: 217.197.115.133',
 u'WARC-Target-URI: http://1018201.vkrugudruzei.ru/blog/
',
 u'',
 u'GET /blog/ HTTP/1.0',
 u'Host: 1018201.vkrugudruzei.ru ',
 u'Accept-Encoding: x-gzip, gzip, deflate',
 u'User-Agent: CCBot/2.0 (http://commoncrawl.org/faq/)
',
 u'Accept:
text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
 u'',
 u'',
 u'',
 u'WARC/1.0',
 u'WARC-Type: response',
 u'WARC-Date: 2016-12-02T17:54:09Z',
 u'WARC-Record-ID: ',
 u'Content-Length: 577',
 u'Content-Type: application/http; msgtype=response',
 u'WARC-Warcinfo-ID: ',
 u'WARC-Concurrent-To:
',
 u'WARC-IP-Address: 217.197.115.133',
 u'WARC-Target-URI: http://1018201.vkrugudruzei.ru/blog/
',
 u'WARC-Payload-Digest: sha1:Y4TZFLB6UTXHU4HUVONBXC5NZQW2LYMM',
 u'WARC-Block-Digest: sha1:3J7HHBMWTSC7W53DDB7BHTUVPM26QS4B',
 u'']

I want to convert it to something like:
{warc-type='request',warc-date='2016-12-02'.
ward-record-id='

Re: Getting exit code of pipe()

2017-02-11 Thread Felix Cheung
Do you want the job to fail if there is an error exit code?

You could set checkCode to True
spark.apache.org/docs/latest/api/python/pyspark.html?highlight=pipe#pyspark.RDD.pipe

Otherwise maybe you want to output the status into stdout so you could process 
it individually.


_
From: Xuchen Yao >
Sent: Friday, February 10, 2017 11:18 AM
Subject: Getting exit code of pipe()
To: >


Hello Community,

I have the following Python code that calls an external command:

rdd.pipe('run.sh', env=os.environ).collect()

run.sh can either exit with status 1 or 0, how could I get the exit code from 
Python? Thanks!

Xuchen




Disable Spark SQL Optimizations for unit tests

2017-02-11 Thread Stefan Ackermann
Hi,

Can the Spark SQL Optimizations be disabled somehow?

In our project we started 4 weeks ago to write scala / spark / dataframe
code. We currently have only around 10% of the planned project scope, and we
are already waiting 10 (Spark 2.1.0, everything cached) to 30 (Spark 1.6,
nothing cached) minutes for a single unit test run to finish. We have for
example one scala file with maybe 80 lines of code (several joins, several
subtrees reused in different places) that takes up to 6 minutes to be
optimized (the catalyst output is also > 100 Mb). The input for our unit
tests is usually 2 - 3 rows. That is the motivation to disable the optimizer
in unit tests.

I have found this  unanswered SO post

 
, but not much more on that topic. I have also found this 
SimpleTestOptimizer

  
which sounds perfect, but I have no idea how to instantiate a Spark Session
so it uses that one. 

Does nobody else have this problem? Is there something fundamentally wrong
with our approach?

Regards,
Stefan Ackermann



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Disable-Spark-SQL-Optimizations-for-unit-tests-tp28380.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: From C* to DataFrames with JSON

2017-02-11 Thread Takeshi Yamamuro
If you upgrade to v2.1, you can use to_json/from_json in sql.functions.

On Fri, Feb 10, 2017 at 3:12 PM, Jean-Francois Gosselin <
jfgosse...@gmail.com> wrote:

>
> Hi all,
>
> I'm struggling (Spark / Scala newbie) to create a DataFrame from a C*
> table but also create a DataFrame from column with json .
>
> e.g. From C* table
>
> | id | jsonData  |
> ==
> | 1 |  {"a": "123", "b": "xyz" } |
> +--+---+
> | 2 |  {"a": "3", "b": "bar" } |
>
>
> to Spark DataFrame:
>
> | id |  a   |  b   |
> ===
> | 1 | 123 | xyz |
> +--+--+-+
> | 2 | 3 | bar |
>
>
> I'm using Spark 1.6 .
>
> Thanks
>
>
> JF
>



-- 
---
Takeshi Yamamuro


Re: EC2 script is missing in Spark 2.0.0~2.1.0

2017-02-11 Thread Md. Rezaul Karim
Thanks for the great help. Appreciated!

Regards,
_
*Md. Rezaul Karim*, BSc, MSc
PhD Researcher, INSIGHT Centre for Data Analytics
National University of Ireland, Galway
IDA Business Park, Dangan, Galway, Ireland
Web: http://www.reza-analytics.eu/index.html


On 11 February 2017 at 13:11, Takeshi Yamamuro 
wrote:

> Moved to https://github.com/amplab/spark-ec2.
> Yea, I think the script just was moved there, so you can use it in the
> same way.
>
> On Sat, Feb 11, 2017 at 9:59 PM, Md. Rezaul Karim <
> rezaul.ka...@insight-centre.org> wrote:
>
>> Hi Takeshi,
>>
>> Now  I understand that spark-ec2 script was moved to AMPLab. How could I
>> use that one i.e. new location/URL, please? Alternatively, can I use the
>> same script provided with prior Spark releases?
>>
>> Regards,
>> _
>> *Md. Rezaul Karim*, BSc, MSc
>> PhD Researcher, INSIGHT Centre for Data Analytics
>> National University of Ireland, Galway
>> IDA Business Park, Dangan, Galway, Ireland
>> Web: http://www.reza-analytics.eu/index.html
>> 
>>
>> On 11 February 2017 at 12:43, Takeshi Yamamuro 
>> wrote:
>>
>>> Hi,
>>>
>>> Have you checked this?
>>> https://issues.apache.org/jira/browse/SPARK-12735
>>>
>>> // maropu
>>>
>>> On Sat, Feb 11, 2017 at 9:34 PM, Md. Rezaul Karim <
>>> rezaul.ka...@insight-centre.org> wrote:
>>>
 Dear Spark Users,

 I was wondering why the EC2 script is missing in Spark release
 2.0.0.~2.1.0? Is there any specific reason for that?

 Please note that I have chosen the package type: Pre-built for Hadoop
 2.7 and later for Spark 2.1.0 for example. Am I doing something wrong?



 Regards,
 _
 *Md. Rezaul Karim*, BSc, MSc
 PhD Researcher, INSIGHT Centre for Data Analytics
 National University of Ireland, Galway
 IDA Business Park, Dangan, Galway, Ireland
 Web: http://www.reza-analytics.eu/index.html
 

>>>
>>>
>>>
>>> --
>>> ---
>>> Takeshi Yamamuro
>>>
>>
>>
>
>
> --
> ---
> Takeshi Yamamuro
>


Re: EC2 script is missing in Spark 2.0.0~2.1.0

2017-02-11 Thread Takeshi Yamamuro
Moved to https://github.com/amplab/spark-ec2.
Yea, I think the script just was moved there, so you can use it in the same
way.

On Sat, Feb 11, 2017 at 9:59 PM, Md. Rezaul Karim <
rezaul.ka...@insight-centre.org> wrote:

> Hi Takeshi,
>
> Now  I understand that spark-ec2 script was moved to AMPLab. How could I
> use that one i.e. new location/URL, please? Alternatively, can I use the
> same script provided with prior Spark releases?
>
> Regards,
> _
> *Md. Rezaul Karim*, BSc, MSc
> PhD Researcher, INSIGHT Centre for Data Analytics
> National University of Ireland, Galway
> IDA Business Park, Dangan, Galway, Ireland
> Web: http://www.reza-analytics.eu/index.html
> 
>
> On 11 February 2017 at 12:43, Takeshi Yamamuro 
> wrote:
>
>> Hi,
>>
>> Have you checked this?
>> https://issues.apache.org/jira/browse/SPARK-12735
>>
>> // maropu
>>
>> On Sat, Feb 11, 2017 at 9:34 PM, Md. Rezaul Karim <
>> rezaul.ka...@insight-centre.org> wrote:
>>
>>> Dear Spark Users,
>>>
>>> I was wondering why the EC2 script is missing in Spark release
>>> 2.0.0.~2.1.0? Is there any specific reason for that?
>>>
>>> Please note that I have chosen the package type: Pre-built for Hadoop
>>> 2.7 and later for Spark 2.1.0 for example. Am I doing something wrong?
>>>
>>>
>>>
>>> Regards,
>>> _
>>> *Md. Rezaul Karim*, BSc, MSc
>>> PhD Researcher, INSIGHT Centre for Data Analytics
>>> National University of Ireland, Galway
>>> IDA Business Park, Dangan, Galway, Ireland
>>> Web: http://www.reza-analytics.eu/index.html
>>> 
>>>
>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>
>


-- 
---
Takeshi Yamamuro


Re: EC2 script is missing in Spark 2.0.0~2.1.0

2017-02-11 Thread Md. Rezaul Karim
Hi Takeshi,

Now  I understand that spark-ec2 script was moved to AMPLab. How could I
use that one i.e. new location/URL, please? Alternatively, can I use the
same script provided with prior Spark releases?

Regards,
_
*Md. Rezaul Karim*, BSc, MSc
PhD Researcher, INSIGHT Centre for Data Analytics
National University of Ireland, Galway
IDA Business Park, Dangan, Galway, Ireland
Web: http://www.reza-analytics.eu/index.html


On 11 February 2017 at 12:43, Takeshi Yamamuro 
wrote:

> Hi,
>
> Have you checked this?
> https://issues.apache.org/jira/browse/SPARK-12735
>
> // maropu
>
> On Sat, Feb 11, 2017 at 9:34 PM, Md. Rezaul Karim <
> rezaul.ka...@insight-centre.org> wrote:
>
>> Dear Spark Users,
>>
>> I was wondering why the EC2 script is missing in Spark release
>> 2.0.0.~2.1.0? Is there any specific reason for that?
>>
>> Please note that I have chosen the package type: Pre-built for Hadoop 2.7
>> and later for Spark 2.1.0 for example. Am I doing something wrong?
>>
>>
>>
>> Regards,
>> _
>> *Md. Rezaul Karim*, BSc, MSc
>> PhD Researcher, INSIGHT Centre for Data Analytics
>> National University of Ireland, Galway
>> IDA Business Park, Dangan, Galway, Ireland
>> Web: http://www.reza-analytics.eu/index.html
>> 
>>
>
>
>
> --
> ---
> Takeshi Yamamuro
>


Re: EC2 script is missing in Spark 2.0.0~2.1.0

2017-02-11 Thread Takeshi Yamamuro
Hi,

Have you checked this?
https://issues.apache.org/jira/browse/SPARK-12735

// maropu

On Sat, Feb 11, 2017 at 9:34 PM, Md. Rezaul Karim <
rezaul.ka...@insight-centre.org> wrote:

> Dear Spark Users,
>
> I was wondering why the EC2 script is missing in Spark release
> 2.0.0.~2.1.0? Is there any specific reason for that?
>
> Please note that I have chosen the package type: Pre-built for Hadoop 2.7
> and later for Spark 2.1.0 for example. Am I doing something wrong?
>
>
>
> Regards,
> _
> *Md. Rezaul Karim*, BSc, MSc
> PhD Researcher, INSIGHT Centre for Data Analytics
> National University of Ireland, Galway
> IDA Business Park, Dangan, Galway, Ireland
> Web: http://www.reza-analytics.eu/index.html
> 
>



-- 
---
Takeshi Yamamuro


EC2 script is missing in Spark 2.0.0~2.1.0

2017-02-11 Thread Md. Rezaul Karim
Dear Spark Users,

I was wondering why the EC2 script is missing in Spark release
2.0.0.~2.1.0? Is there any specific reason for that?

Please note that I have chosen the package type: Pre-built for Hadoop 2.7
and later for Spark 2.1.0 for example. Am I doing something wrong?



Regards,
_
*Md. Rezaul Karim*, BSc, MSc
PhD Researcher, INSIGHT Centre for Data Analytics
National University of Ireland, Galway
IDA Business Park, Dangan, Galway, Ireland
Web: http://www.reza-analytics.eu/index.html



Re: [Structured Streaming] Using File Sink to store to hive table.

2017-02-11 Thread Sam Elamin
Here's a link to the thread

http://apache-spark-developers-list.1001551.n3.nabble.com/Structured-Streaming-Dropping-Duplicates-td20884.html
On Sat, 11 Feb 2017 at 08:47, Sam Elamin  wrote:

> Hey Egor
>
>
> You can use for each writer or you can write a custom sink. I personally
> went with a custom sink since I get a dataframe per batch
>
>
> https://github.com/samelamin/spark-bigquery/blob/master/src/main/scala/com/samelamin/spark/bigquery/streaming/BigQuerySink.scala
>
> You can have a look at how I implemented something similar to file sink
> that in the event if a failure skips batches already written
>
>
> Also have a look at Micheals reply to me a few days ago on exactly the
> same topic. The email subject was called structured streaming. Dropping
> duplicates
>
>
> Regards
>
> Sam
>
> On Sat, 11 Feb 2017 at 07:59, Jacek Laskowski  wrote:
>
> "Something like that" I've never tried it out myself so I'm only
> guessing having a brief look at the API.
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
>
> On Sat, Feb 11, 2017 at 1:31 AM, Egor Pahomov 
> wrote:
> > Jacek, so I create cache in ForeachWriter, in all "process()" I write to
> it
> > and on close I flush? Something like that?
> >
> > 2017-02-09 12:42 GMT-08:00 Jacek Laskowski :
> >>
> >> Hi,
> >>
> >> Yes, that's ForeachWriter.
> >>
> >> Yes, it works with element by element. You're looking for mapPartition
> >> and ForeachWriter has partitionId that you could use to implement a
> >> similar thing.
> >>
> >> Pozdrawiam,
> >> Jacek Laskowski
> >> 
> >> https://medium.com/@jaceklaskowski/
> >> Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
> >> Follow me at https://twitter.com/jaceklaskowski
> >>
> >>
> >> On Thu, Feb 9, 2017 at 3:55 AM, Egor Pahomov 
> >> wrote:
> >> > Jacek, you mean
> >> >
> >> >
> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.ForeachWriter
> >> > ? I do not understand how to use it, since it passes every value
> >> > separately,
> >> > not every partition. And addding to table value by value would not
> work
> >> >
> >> > 2017-02-07 12:10 GMT-08:00 Jacek Laskowski :
> >> >>
> >> >> Hi,
> >> >>
> >> >> Have you considered foreach sink?
> >> >>
> >> >> Jacek
> >> >>
> >> >> On 6 Feb 2017 8:39 p.m., "Egor Pahomov" 
> wrote:
> >> >>>
> >> >>> Hi, I'm thinking of using Structured Streaming instead of old
> >> >>> streaming,
> >> >>> but I need to be able to save results to Hive table. Documentation
> for
> >> >>> file
> >> >>> sink
> >> >>>
> >> >>> says(
> http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks
> ):
> >> >>> "Supports writes to partitioned tables. ". But being able to write
> to
> >> >>> partitioned directories is not enough to write to the table: someone
> >> >>> needs
> >> >>> to write to Hive metastore. How can I use Structured Streaming and
> >> >>> write to
> >> >>> Hive table?
> >> >>>
> >> >>> --
> >> >>> Sincerely yours
> >> >>> Egor Pakhomov
> >> >
> >> >
> >> >
> >> >
> >> > --
> >> > Sincerely yours
> >> > Egor Pakhomov
> >
> >
> >
> >
> > --
> > Sincerely yours
> > Egor Pakhomov
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: [Structured Streaming] Using File Sink to store to hive table.

2017-02-11 Thread Sam Elamin
Hey Egor


You can use for each writer or you can write a custom sink. I personally
went with a custom sink since I get a dataframe per batch

https://github.com/samelamin/spark-bigquery/blob/master/src/main/scala/com/samelamin/spark/bigquery/streaming/BigQuerySink.scala

You can have a look at how I implemented something similar to file sink
that in the event if a failure skips batches already written


Also have a look at Micheals reply to me a few days ago on exactly the same
topic. The email subject was called structured streaming. Dropping
duplicates


Regards

Sam

On Sat, 11 Feb 2017 at 07:59, Jacek Laskowski  wrote:

"Something like that" I've never tried it out myself so I'm only
guessing having a brief look at the API.

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Sat, Feb 11, 2017 at 1:31 AM, Egor Pahomov 
wrote:
> Jacek, so I create cache in ForeachWriter, in all "process()" I write to
it
> and on close I flush? Something like that?
>
> 2017-02-09 12:42 GMT-08:00 Jacek Laskowski :
>>
>> Hi,
>>
>> Yes, that's ForeachWriter.
>>
>> Yes, it works with element by element. You're looking for mapPartition
>> and ForeachWriter has partitionId that you could use to implement a
>> similar thing.
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://medium.com/@jaceklaskowski/
>> Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
>> Follow me at https://twitter.com/jaceklaskowski
>>
>>
>> On Thu, Feb 9, 2017 at 3:55 AM, Egor Pahomov 
>> wrote:
>> > Jacek, you mean
>> >
>> >
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.ForeachWriter
>> > ? I do not understand how to use it, since it passes every value
>> > separately,
>> > not every partition. And addding to table value by value would not work
>> >
>> > 2017-02-07 12:10 GMT-08:00 Jacek Laskowski :
>> >>
>> >> Hi,
>> >>
>> >> Have you considered foreach sink?
>> >>
>> >> Jacek
>> >>
>> >> On 6 Feb 2017 8:39 p.m., "Egor Pahomov" 
wrote:
>> >>>
>> >>> Hi, I'm thinking of using Structured Streaming instead of old
>> >>> streaming,
>> >>> but I need to be able to save results to Hive table. Documentation
for
>> >>> file
>> >>> sink
>> >>>
>> >>> says(
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks
):
>> >>> "Supports writes to partitioned tables. ". But being able to write to
>> >>> partitioned directories is not enough to write to the table: someone
>> >>> needs
>> >>> to write to Hive metastore. How can I use Structured Streaming and
>> >>> write to
>> >>> Hive table?
>> >>>
>> >>> --
>> >>> Sincerely yours
>> >>> Egor Pakhomov
>> >
>> >
>> >
>> >
>> > --
>> > Sincerely yours
>> > Egor Pakhomov
>
>
>
>
> --
> Sincerely yours
> Egor Pakhomov

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org