Re: [External] Re: [Spark Structured Streaming]: Is it possible to ingest data from a jdbc data source incrementally?

2017-01-04 Thread Ben Teeuwen
gt;>> the ETL tools would too...
>>>>>>>> 4. Finally, update check point...
>>>>>>>>
>>>>>>>> You may "determine" checkpoint from the data you already have in
>>>>>>>> HDFS if you create a Hive structure on it.
>>>>>>>>
>>>>>>>> Best
>>>>>>>> AYan
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Dec 30, 2016 at 4:45 PM, 任弘迪 <ryan.hd@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> why not sync binlog of mysql(hopefully the data is immutable and
>>>>>>>>> the table is append-only), send the log through kafka and then 
>>>>>>>>> consume it
>>>>>>>>> by spark streaming?
>>>>>>>>>
>>>>>>>>> On Fri, Dec 30, 2016 at 9:01 AM, Michael Armbrust <
>>>>>>>>> mich...@databricks.com> wrote:
>>>>>>>>>
>>>>>>>>>> We don't support this yet, but I've opened this JIRA as it sounds
>>>>>>>>>> generally useful: https://issues.apache.
>>>>>>>>>> org/jira/browse/SPARK-19031
>>>>>>>>>>
>>>>>>>>>> In the mean time you could try implementing your own Source, but
>>>>>>>>>> that is pretty low level and is not yet a stable API.
>>>>>>>>>>
>>>>>>>>>> On Thu, Dec 29, 2016 at 4:05 AM, "Yuanzhe Yang (杨远哲)" <
>>>>>>>>>> yyz1...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi all,
>>>>>>>>>>>
>>>>>>>>>>> Thanks a lot for your contributions to bring us new technologies.
>>>>>>>>>>>
>>>>>>>>>>> I don't want to waste your time, so before I write to you, I
>>>>>>>>>>> googled, checked stackoverflow and mailing list archive with 
>>>>>>>>>>> keywords
>>>>>>>>>>> "streaming" and "jdbc". But I was not able to get any solution to 
>>>>>>>>>>> my use
>>>>>>>>>>> case. I hope I can get some clarification from you.
>>>>>>>>>>>
>>>>>>>>>>> The use case is quite straightforward, I need to harvest a
>>>>>>>>>>> relational database via jdbc, do something with data, and store 
>>>>>>>>>>> result into
>>>>>>>>>>> Kafka. I am stuck at the first step, and the difficulty is as 
>>>>>>>>>>> follows:
>>>>>>>>>>>
>>>>>>>>>>> 1. The database is too large to ingest with one thread.
>>>>>>>>>>> 2. The database is dynamic and time series data comes in
>>>>>>>>>>> constantly.
>>>>>>>>>>>
>>>>>>>>>>> Then an ideal workflow is that multiple workers process
>>>>>>>>>>> partitions of data incrementally according to a time window. For 
>>>>>>>>>>> example,
>>>>>>>>>>> the processing starts from the earliest data with each batch 
>>>>>>>>>>> containing
>>>>>>>>>>> data for one hour. If data ingestion speed is faster than data 
>>>>>>>>>>> production
>>>>>>>>>>> speed, then eventually the entire database will be harvested and 
>>>>>>>>>>> those
>>>>>>>>>>> workers will start to "tail" the database for new data streams and 
>>>>>>>>>>> the
>>>>>>>>>>> processing becomes real time.
>>>>>>>>>>>
>>>>>>>>>>> With Spark SQL I can ingest data from a JDBC source with
>>>>>>>>>>> partitions divided by time windows, but how can I dynamically 
>>>>>>>>>>> increment the
>>>>>>>>>>> time windows during execution? Assume that there are two workers 
>>>>>>>>>>> ingesting
>>>>>>>>>>> data of 2017-01-01 and 2017-01-02, the one which finishes quicker 
>>>>>>>>>>> gets next
>>>>>>>>>>> task for 2017-01-03. But I am not able to find out how to increment 
>>>>>>>>>>> those
>>>>>>>>>>> values during execution.
>>>>>>>>>>>
>>>>>>>>>>> Then I looked into Structured Streaming. It looks much more
>>>>>>>>>>> promising because window operations based on event time are 
>>>>>>>>>>> considered
>>>>>>>>>>> during streaming, which could be the solution to my use case. 
>>>>>>>>>>> However, from
>>>>>>>>>>> documentation and code example I did not find anything related to 
>>>>>>>>>>> streaming
>>>>>>>>>>> data from a growing database. Is there anything I can read to 
>>>>>>>>>>> achieve my
>>>>>>>>>>> goal?
>>>>>>>>>>>
>>>>>>>>>>> Any suggestion is highly appreciated. Thank you very much and
>>>>>>>>>>> have a nice day.
>>>>>>>>>>>
>>>>>>>>>>> Best regards,
>>>>>>>>>>> Yang
>>>>>>>>>>> 
>>>>>>>>>>> -
>>>>>>>>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Best Regards,
>>>>>>>> Ayan Guha
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best Regards,
>>>>>> Ayan Guha
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Best Regards,
>>>> Ayan Guha
>>>>
>>>
>>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>


-- 
Ben Teeuwen
Senior Data Scientist

Booking.com B.V.
Rembrandtplein 29-45 Amsterdam 1017CT Netherlands
Direct +31207153058
[image: Booking.com] <http://www.booking.com/>
The world's #1 accommodation site
43 languages, 187+ offices worldwide, 96,000+ global destinations,
1,200,000+ room nights booked every day
No booking fees, best price always guaranteed
Subsidiary of the Priceline Group (NASDAQ: PCLN)


Re: OOM with StringIndexer, 800m rows & 56m distinct value column

2016-08-22 Thread Ben Teeuwen
I don’t think scaling RAM is a sane strategy to fixing these problems with 
using a dataframe / transformer approach to creating large sparse vectors.

One, though yes it will delay when it will fail, it will still fail. The 
original case I emailed about I tried this, and after waiting 50 minutes, it 
still broke.

Second, if you don’t use dataframes / transformers, but write your own 
functions to do one hot encoding and creating sparse vectors, it will easily 
work on small boxes. E.g. build up a dictionary with unique index numbers for 
all unique values, and access these indexes when creating sparse vectors:

def makeDict(df,columnName):
dict = df.select(columnName).map(lambda 
x:unicode(x[0])).distinct().zipWithIndex().collectAsMap()
dict["missing"] = len(dict)
return dict

def encodeOneHot(x, column):
key = "missing"
if unicode(x) in mappings_bc[column]:
key = unicode(x)
return 
Vectors.sparse(len(mappings_bc[column]),[mappings_bc[column][key]],[1.0])

Ben

> On Aug 19, 2016, at 11:34 PM, Davies Liu <dav...@databricks.com> wrote:
> 
> The OOM happen in driver, you may also need more memory for driver.
> 
> On Fri, Aug 19, 2016 at 2:33 PM, Davies Liu <dav...@databricks.com> wrote:
>> You are using lots of tiny executors (128 executor with only 2G
>> memory), could you try with bigger executor (for example 16G x 16)?
>> 
>> On Fri, Aug 19, 2016 at 8:19 AM, Ben Teeuwen <bteeu...@gmail.com> wrote:
>>> 
>>> So I wrote some code to reproduce the problem.
>>> 
>>> I assume here that a pipeline should be able to transform a categorical 
>>> feature with a few million levels.
>>> So I create a dataframe with the categorical feature (‘id’), apply a 
>>> StringIndexer and OneHotEncoder transformer, and run a loop where I 
>>> increase the amount of levels.
>>> It breaks at 1.276.000 levels.
>>> 
>>> Shall I report this as a ticket in JIRA?
>>> 
>>> 
>>> 
>>> 
>>> from pyspark.sql.functions import rand
>>> from pyspark.ml.feature import OneHotEncoder, StringIndexer,VectorAssembler
>>> from pyspark.ml import Pipeline
>>> 
>>> start_id = 10
>>> n = 500
>>> step = (n - start_id) / 25
>>> 
>>> for i in xrange(start_id,start_id + n,step):
>>>print "#\n {}".format(i)
>>>dfr = (sqlContext
>>>   .range(start_id, start_id + i)
>>>   .withColumn(‘label', rand(seed=10))
>>>   .withColumn('feat2', rand(seed=101))
>>>#.withColumn('normal', randn(seed=27))
>>>   ).repartition(32).cache()
>>># dfr.select("id", rand(seed=10).alias("uniform"), 
>>> randn(seed=27).alias("normal")).show()
>>>dfr.show(1)
>>>print "This dataframe has {0} rows (and therefore {0} levels will be one 
>>> hot encoded)".format(dfr.count())
>>> 
>>>categorical_feature  = ['id']
>>>stages = []
>>> 
>>>for c in categorical_feature:
>>>stages.append(StringIndexer(inputCol=c, 
>>> outputCol="{}Index".format(c)))
>>>stages.append(OneHotEncoder(dropLast= False, inputCol = 
>>> "{}Index".format(c), outputCol = "{}OHE".format(c)))
>>> 
>>>columns = ["{}OHE".format(x) for x in categorical_feature]
>>>columns.append('feat2')
>>> 
>>>assembler = VectorAssembler(
>>>inputCols=columns,
>>>outputCol="features")
>>>stages.append(assembler)
>>> 
>>>df2 = dfr
>>> 
>>>pipeline = Pipeline(stages=stages)
>>>pipeline_fitted = pipeline.fit(df2)
>>>df3 = pipeline_fitted.transform(df2)
>>>df3.show(1)
>>>dfr.unpersist()
>>> 
>>> 
>>> 
>>> 
>>> Output:
>>> 
>>> 
>>> #
>>> 10
>>> +--+---+---+
>>> |id|label  |  feat2|
>>> +--+---+---+
>>> |183601|0.38693226548356197|0.04485291680169634|
>>> +--+---+---+
>>> only showing top 1 row
>>> 
>>> This dataframe has 10 rows (and therefore 10 levels will be one hot 
>>> encoded)
>>> +--+---+

Re: OOM with StringIndexer, 800m rows & 56m distinct value column

2016-08-19 Thread Ben Teeuwen
2 return_value = get_return_value(
--> 933 answer, self.gateway_client, self.target_id, self.name)
934 
935 for temp_arg in temp_args:

/opt/spark/2.0.0/python/pyspark/sql/utils.py in deco(*a, **kw)
 61 def deco(*a, **kw):
 62 try:
---> 63 return f(*a, **kw)
 64 except py4j.protocol.Py4JJavaError as e:
 65 s = e.java_exception.toString()

/opt/spark/2.0.0/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py in 
get_return_value(answer, gateway_client, target_id, name)
310 raise Py4JJavaError(
311 "An error occurred while calling {0}{1}{2}.\n".
--> 312 format(target_id, ".", name), value)
313 else:
314 raise Py4JError(

Py4JJavaError: An error occurred while calling o408.transform.
: java.lang.OutOfMemoryError: GC overhead limit exceeded
at scala.collection.immutable.Stream$.from(Stream.scala:1262)
at 
scala.collection.immutable.Stream$$anonfun$from$1.apply(Stream.scala:1262)
at 
scala.collection.immutable.Stream$$anonfun$from$1.apply(Stream.scala:1262)
at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1233)
at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1223)
at 
scala.collection.LinearSeqOptimized$class.loop$1(LinearSeqOptimized.scala:274)
at 
scala.collection.LinearSeqOptimized$class.lengthCompare(LinearSeqOptimized.scala:277)
at scala.collection.immutable.Stream.lengthCompare(Stream.scala:202)
at 
scala.collection.SeqViewLike$Zipped$class.length(SeqViewLike.scala:133)
at scala.collection.SeqViewLike$$anon$9.length(SeqViewLike.scala:203)
at 
scala.collection.SeqViewLike$Mapped$class.length(SeqViewLike.scala:66)
at scala.collection.SeqViewLike$$anon$3.length(SeqViewLike.scala:197)
at scala.collection.SeqLike$class.size(SeqLike.scala:106)
at 
scala.collection.SeqViewLike$AbstractTransformed.size(SeqViewLike.scala:37)
at 
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:285)
at 
scala.collection.SeqViewLike$AbstractTransformed.toArray(SeqViewLike.scala:37)
at 
org.apache.spark.ml.attribute.AttributeGroup$$anonfun$3.apply(AttributeGroup.scala:72)
at 
org.apache.spark.ml.attribute.AttributeGroup$$anonfun$3.apply(AttributeGroup.scala:72)
at scala.Option.map(Option.scala:146)
at 
org.apache.spark.ml.attribute.AttributeGroup.(AttributeGroup.scala:70)
at 
org.apache.spark.ml.attribute.AttributeGroup.(AttributeGroup.scala:65)
at 
org.apache.spark.ml.attribute.AttributeGroup$.fromMetadata(AttributeGroup.scala:234)
at 
org.apache.spark.ml.attribute.AttributeGroup$.fromStructField(AttributeGroup.scala:246)
at 
org.apache.spark.ml.feature.OneHotEncoder.transform(OneHotEncoder.scala:139)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128)



Spark Properties
NameValue
spark.app.name  pyspark-shell
spark.driver.cores  1
spark.driver.extraJavaOptions   -XX:+UnlockDiagnosticVMOptions 
-XX:+PerfDisableSharedMem
spark.driver.memory 2g
spark.dynamicAllocation.enabled FALSE
spark.eventLog.dir  hdfs:///spark/history
spark.eventLog.enabled  TRUE
spark.executor.cores1
spark.executor.extraJavaOptions -XX:+UnlockDiagnosticVMOptions 
-XX:+PerfDisableSharedMem
spark.executor.id   driver
spark.executor.instances128
spark.executor.memory   2g
spark.history.fs.logDirectory   hdfs:///spark/history
spark.masteryarn-client
spark.memory.fraction   0.7
spark.memory.storageFraction0.5
spark.rdd.compress  TRUE
spark.scheduler.modeFIFO
spark.serializer.objectStreamReset  100
spark.shuffle.service.enabled   FALSE
spark.speculation   TRUE
spark.submit.deployMode client
spark.task.maxFailures  10
spark.yarn.executor.memoryOverhead  2048
spark.yarn.isPython TRUE


> On Aug 11, 2016, at 10:24 PM, Nick Pentreath <nick.pentre...@gmail.com> wrote:
> 
> Ok, interesting. Would be interested to see how it compares.
> 
> By the way, the feature size you select for the hasher should be a power of 2 
> (e.g. 2**24 to 2**26 may be worth trying) to ensure the feature indexes are 
> evenly distributed (see the section on HashingTF under 
> http://spark.apache.org/docs/latest/ml-features.html#t

Re: Spark join and large temp files

2016-08-11 Thread Ben Teeuwen
Hmm, hashing will probably send all of the records with the same key to the 
same partition / machine.
I’d try it out, and hope that if you have a few superlarge keys bigger than the 
RAM available of one node, they spill to disk. Maybe play with persist() and 
using a different Storage Level.

> On Aug 11, 2016, at 9:48 PM, Gourav Sengupta <gourav.sengu...@gmail.com> 
> wrote:
> 
> Hi Ben,
> 
> and that will take care of skewed data?
> 
> Gourav 
> 
> On Thu, Aug 11, 2016 at 8:41 PM, Ben Teeuwen <bteeu...@gmail.com 
> <mailto:bteeu...@gmail.com>> wrote:
> When you read both ‘a’ and ‘b', can you try repartitioning both by column 
> ‘id’? 
> If you .cache() and .count() to force a shuffle, it'll push the records that 
> will be joined to the same executors. 
> 
> So;
> a = spark.read.parquet(‘path_to_table_a’).repartition(‘id’).cache()
> a.count()
> 
> b = spark.read.parquet(‘path_to_table_b').repartition(‘id’).cache()
> b.count()
> 
> And then join..
> 
> 
>> On Aug 8, 2016, at 8:17 PM, Ashic Mahtab <as...@live.com 
>> <mailto:as...@live.com>> wrote:
>> 
>> Hello,
>> We have two parquet inputs of the following form:
>> 
>> a: id:String, Name:String  (1.5TB)
>> b: id:String, Number:Int  (1.3GB)
>> 
>> We need to join these two to get (id, Number, Name). We've tried two 
>> approaches:
>> 
>> a.join(b, Seq("id"), "right_outer")
>> 
>> where a and b are dataframes. We also tried taking the rdds, mapping them to 
>> pair rdds with id as the key, and then joining. What we're seeing is that 
>> temp file usage is increasing on the join stage, and filling up our disks, 
>> causing the job to crash. Is there a way to join these two data sets without 
>> well...crashing?
>> 
>> Note, the ids are unique, and there's a one to one mapping between the two 
>> datasets. 
>> 
>> Any help would be appreciated.
>> 
>> -Ashic. 
> 
> 



Re: OOM with StringIndexer, 800m rows & 56m distinct value column

2016-08-11 Thread Ben Teeuwen
Thanks Nick, I played around with the hashing trick. When I set numFeatures to 
the amount of distinct values for the largest sparse feature, I ended up with 
half of them colliding. When raising the numFeatures to have less collisions, I 
soon ended up with the same memory problems as before. To be honest, I didn’t 
test the impact of having more or less collisions on the quality of the 
predictions, but tunnel visioned into getting it to work with the full sparsity.

Before I worked in RDD land; zipWithIndex on rdd with distinct values + one 
entry ‘missing’ for missing values during predict, collectAsMap, broadcast the 
map, udf generating sparse vector, assembling the vectors manually). To move 
into dataframe land, I wrote:

def getMappings(mode):
mappings = defaultdict(dict)
max_index = 0
for c in cat_int[:]:# for every categorical variable

logging.info("starting with {}".format(c))
if mode == 'train':
grouped = (df2  
.groupBy(c).count().orderBy('count', ascending = False)  # get 
counts, ordered from largest to smallest 
.selectExpr("*", "1 as n")  # prepare for window function 
summing up 1s before current row to create a RANK
.selectExpr("*", "SUM(n) OVER (ORDER BY count DESC ROWS BETWEEN 
UNBOUNDED PRECEDING AND 0 PRECEDING) + {} AS index".format(max_index))
.drop('n') # drop the column with static 1 values used for the 
cumulative sum
)
logging.info("Got {} rows.".format(grouped.count()))
grouped.show()
logging.info('getting max')
max_index = grouped.selectExpr("MAX(index) t").rdd.map(lambda r: 
r.t).first()  # update the max index so next categorical feature starts with it.
logging.info("max_index has become: {}".format(max_index))
logging.info('adding missing value, so we also train on this and 
prediction data missing it. ')
schema = grouped.schema
logging.info(schema)
grouped = grouped.union(spark.createDataFrame([('missing', 0, 
max_index + 1)], schema))  # add index for missing value for values during 
predict that are unseen during training.
max_index += 1
saveto = "{}/{}".format(path, c)
logging.info("Writing to: {}".format(saveto))
grouped.write.parquet(saveto, mode = 'overwrite')

elif mode == 'predict':
loadfrom = "{}/{}".format(path, c)
logging.info("Reading from: {}".format(loadfrom))
grouped = spark.read.parquet(loadfrom)

logging.info("Adding to dictionary")
mappings[c] = grouped.rdd.map(lambda r: r.asDict()).map(lambda d: 
(d[c], d['index'])).collectAsMap()  # build up dictionary to be broadcasted 
later on, used for creating sparse vectors
max_index = grouped.selectExpr("MAX(index) t").rdd.map(lambda r: 
r.t).first()

logging.info("Sanity check for indexes:")
for c in cat_int[:]:
logging.info("{} min: {} max: {}".format(c, min(mappings[c].values()), 
max(mappings[c].values(   # some logging to confirm the indexes.
logging.info("Missing value = {}".format(mappings[c]['missing']))
return max_index, mappings

I’d love to see the StringIndexer + OneHotEncoder transformers cope with 
missing values during prediction; for now I’ll work with the hacked stuff above 
:).
(.. and I should compare the performance with using the hashing trick.)

Ben

> On Aug 4, 2016, at 3:44 PM, Nick Pentreath <nick.pentre...@gmail.com> wrote:
> 
> Sure, I understand there are some issues with handling this missing value 
> situation in StringIndexer currently. Your workaround is not ideal but I see 
> that it is probably the only mechanism available currently to avoid the 
> problem.
> 
> But the OOM issues seem to be more about the feature cardinality (so the size 
> of the hashmap to store the feature <-> index mappings).
> 
> A nice property of feature hashing is that it implicitly handles unseen 
> category labels by setting the coefficient value to 0 (in the absence of a 
> hash collision) - basically option 2 from H2O.
> 
> Why is that? Well once you've trained your model you have a (sparse) 
> N-dimensional weight vector that will be definition have 0s for unseen 
> indexes. At test time, any feature that only appears in your test set or new 
> data will be hashed to an index in the weight vector that has value 0.
> 
> So it could be useful for both of your problems.
> 
> On Thu, 4 Aug 2016 at 15:25 Ben Teeuwen <bteeu...@gmail.com 
> <mailto:bteeu...@gmail.com>> wrote:
> Hi Nick, 
> 
> Thanks for the suggestion. Reducing the dimension

Re: Spark join and large temp files

2016-08-11 Thread Ben Teeuwen
When you read both ‘a’ and ‘b', can you try repartitioning both by column ‘id’? 
If you .cache() and .count() to force a shuffle, it'll push the records that 
will be joined to the same executors. 

So;
a = spark.read.parquet(‘path_to_table_a’).repartition(‘id’).cache()
a.count()

b = spark.read.parquet(‘path_to_table_b').repartition(‘id’).cache()
b.count()

And then join..


> On Aug 8, 2016, at 8:17 PM, Ashic Mahtab  wrote:
> 
> Hello,
> We have two parquet inputs of the following form:
> 
> a: id:String, Name:String  (1.5TB)
> b: id:String, Number:Int  (1.3GB)
> 
> We need to join these two to get (id, Number, Name). We've tried two 
> approaches:
> 
> a.join(b, Seq("id"), "right_outer")
> 
> where a and b are dataframes. We also tried taking the rdds, mapping them to 
> pair rdds with id as the key, and then joining. What we're seeing is that 
> temp file usage is increasing on the join stage, and filling up our disks, 
> causing the job to crash. Is there a way to join these two data sets without 
> well...crashing?
> 
> Note, the ids are unique, and there's a one to one mapping between the two 
> datasets. 
> 
> Any help would be appreciated.
> 
> -Ashic. 



Re: registering udf to use in spark.sql('select...

2016-08-04 Thread Ben Teeuwen
Yes, but I don’t want to use it in a select() call. 
Either selectExpr() or spark.sql(), with the udf being called inside a string.

Now I got it to work using 
"sqlContext.registerFunction('encodeOneHot_udf',encodeOneHot, VectorUDT())”
But this sqlContext approach will disappear, right? So I’m curious what to use 
instead.

> On Aug 4, 2016, at 3:54 PM, Nicholas Chammas <nicholas.cham...@gmail.com> 
> wrote:
> 
> Have you looked at pyspark.sql.functions.udf and the associated examples?
> 2016년 8월 4일 (목) 오전 9:10, Ben Teeuwen <bteeu...@gmail.com 
> <mailto:bteeu...@gmail.com>>님이 작성:
> Hi,
> 
> I’d like to use a UDF in pyspark 2.0. As in ..
>  
> 
> def squareIt(x):
>   return x * x
> 
> # register the function and define return type
> ….
> 
> spark.sql(“”"select myUdf(adgroupid, 'extra_string_parameter') as 
> function_result from df’)
> 
> _
> 
> How can I register the function? I only see registerFunction in the 
> deprecated sqlContext at 
> http://spark.apache.org/docs/2.0.0/api/python/pyspark.sql.html 
> <http://spark.apache.org/docs/2.0.0/api/python/pyspark.sql.html>.
> As the ‘spark’ object unifies hiveContext and sqlContext, what is the new way 
> to go?
> 
> Ben



Re: OOM with StringIndexer, 800m rows & 56m distinct value column

2016-08-04 Thread Ben Teeuwen
DataStreamer$ResponseProcessor.run(DFSOutputStream.java:897)
16/08/04 10:36:03 WARN DFSClient: Error Recovery for block 
BP-292564-10.196.101.2-1366289936494:blk_2802150425_1105993467488 in 
pipeline 10.10.66.3:50010, 10.10.66.1:50010, 10.10.95.29:50010: bad datanode 
10.10.95.29:50010
16/08/04 10:40:48 WARN DFSClient: Slow ReadProcessor read fields took 60891ms 
(threshold=3ms); ack: seqno: -2 status:



After 40 minutes or so, with no activity in the application master, it dies.

Ben

> On Aug 4, 2016, at 12:14 PM, Nick Pentreath <nick.pentre...@gmail.com> wrote:
> 
> Hi Ben
> 
> Perhaps with this size cardinality it is worth looking at feature hashing for 
> your problem. Spark has the HashingTF transformer that works on a column of 
> "sentences" (i.e. [string]).
> 
> For categorical features you can hack it a little by converting your feature 
> value into a ["feature_name=feature_value"] representation. Then HashingTF 
> can be used as is. Note you can also just do ["feature_value"], but the 
> former would allow you, with a bit of munging, to hash all your feature 
> columns at the same time.
> 
> The advantage is speed and bounded memory footprint. The disadvantages 
> include (i) no way to reverse the mapping from feature_index -> feature_name; 
> (ii) potential for hash collisions (can be helped a bit by increasing your 
> feature vector size).
> 
> Here is a minimal example:
> 
> In [1]: from pyspark.ml.feature import StringIndexer, OneHotEncoder, HashingTF
> In [2]: from pyspark.sql.types import StringType, ArrayType
> In [3]: from pyspark.sql.functions import udf
> 
> In [4]: df = spark.createDataFrame([(0, "foo"), (1, "bar"), (2, "foo"), (3, 
> "baz")], ["id", "feature"])
> 
> In [5]: to_array = udf(lambda s: ["feature=%s" % s], ArrayType(StringType()))
> 
> In [6]: df = df.withColumn("features", to_array("feature"))
> 
> In [7]: df.show()
> +---+---+-+
> | id|feature| features|
> +---+---+-+
> |  0|foo|[feature=foo]|
> |  1|bar|[feature=bar]|
> |  2|foo|[feature=foo]|
> |  3|baz|[feature=baz]|
> +---+---+-+
> 
> In [8]: indexer = StringIndexer(inputCol="feature", outputCol="feature_index")
> 
> In [9]: indexed = indexer.fit(df).transform(df)
> 
> In [10]: encoder = OneHotEncoder(dropLast=False, inputCol="feature_index", 
> outputCol="feature_vector")
> 
> In [11]: encoded = encoder.transform(indexed)
> 
> In [12]: encoded.show()
> +---+---+-+-+--+
> | id|feature| features|feature_index|feature_vector|
> +---+---+-+-+--+
> |  0|foo|[feature=foo]|  0.0| (3,[0],[1.0])|
> |  1|bar|[feature=bar]|  2.0| (3,[2],[1.0])|
> |  2|foo|[feature=foo]|  0.0| (3,[0],[1.0])|
> |  3|baz|[feature=baz]|  1.0| (3,[1],[1.0])|
> +---+---+-+-+--+
> 
> In [22]: hasher = HashingTF(numFeatures=2**8, inputCol="features", 
> outputCol="features_vector")
> 
> In [23]: hashed = hasher.transform(df)
> 
> In [24]: hashed.show()
> +---+---+-+-+
> | id|feature| features|  features_vector|
> +---+---+-+-+
> |  0|foo|[feature=foo]| (256,[59],[1.0])|
> |  1|bar|[feature=bar]|(256,[219],[1.0])|
> |  2|foo|[feature=foo]| (256,[59],[1.0])|
> |  3|baz|[feature=baz]| (256,[38],[1.0])|
> +---+---+-+-+
> 
> On Thu, 4 Aug 2016 at 10:07 Ben Teeuwen <bteeu...@gmail.com 
> <mailto:bteeu...@gmail.com>> wrote:
> I raised driver memory to 30G and maxresultsize to 25G, this time in pyspark. 
> 
> Code run:
> 
> cat_int  = ['bigfeature']
> 
> stagesIndex = []
> stagesOhe   = []
> for c in cat_int:
>   stagesIndex.append(StringIndexer(inputCol=c, outputCol="{}Index".format(c)))
>   stagesOhe.append(OneHotEncoder(dropLast= False, inputCol = 
> "{}Index".format(c), outputCol = "{}OHE".format(c)))
> 
> df2 = df
> 
> for i in range(len(stagesIndex)):
>   logging.info <http://logging.info/>("Starting with {}".format(cat_int[i]))
>   stagesIndex[i].fit(df2)
>   logging.info <http://logging.info/>("Fitted. Now transforming:")
>   df2 = stagesIndex[i].fit(df2).transform(df2)
>   logging.info <http://logging.info/>("Transformed. Now showing transformed:")
>   df2.show()
>   logging.info

registering udf to use in spark.sql('select...

2016-08-04 Thread Ben Teeuwen
Hi,

I’d like to use a UDF in pyspark 2.0. As in ..
 

def squareIt(x):
  return x * x

# register the function and define return type
….

spark.sql(“”"select myUdf(adgroupid, 'extra_string_parameter') as 
function_result from df’)

_

How can I register the function? I only see registerFunction in the deprecated 
sqlContext at http://spark.apache.org/docs/2.0.0/api/python/pyspark.sql.html 
.
As the ‘spark’ object unifies hiveContext and sqlContext, what is the new way 
to go?

Ben

Re: OOM with StringIndexer, 800m rows & 56m distinct value column

2016-08-04 Thread Ben Teeuwen
 
 [13/7074]
at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.TimeoutException: Cannot receive any reply in 
120 seconds
... 8 more
16/08/04 09:10:45 WARN TransportResponseHandler: Ignoring response for RPC 
485672840406395 from /10.10.80.4:59931 (47 bytes) since it is not 
outstanding
^[[A^[[5~2016-08-04 09:12:07,016 INFO   Transformed. Now showing 
transformed:
16/08/04 09:13:48 WARN DFSClient: Slow ReadProcessor read fields took 71756ms 
(threshold=3ms); ack: seqno: -2 status: SUCCESS status: ERROR 
downstreamAckTimeNanos: 0, targets: [10.10.66.5:50010, 10.10.10.12:50010, 
10.10.91.9:50010]
16/08/04 09:13:48 WARN DFSClient: DFSOutputStream ResponseProcessor exception  
for block BP-292564-10.196.101.2-1366289936494:blk_2801951315_1105993181265
java.io.IOException: Bad response ERROR for block 
BP-292564-10.196.101.2-1366289936494:blk_2801951315_1105993181265 from 
datanode 10.10.10.12:50010
at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer$ResponseProcessor.run(DFSOutputStream.java:897)
16/08/04 09:13:48 WARN DFSClient: Error Recovery for block 
BP-292564-10.196.101.2-1366289936494:blk_2801951315_1105993181265 in 
pipeline 10.10.66.5:50010, 10.10.10.12:50010, 10.192.91.9:50010: bad datanode 
10.10.10.12:50010
Traceback (most recent call last):
  File "", line 7, in 
  File "/opt/spark/2.0.0/python/pyspark/sql/dataframe.py", line 287, in show
print(self._jdf.showString(n, truncate))
  File "/opt/spark/2.0.0/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py", 
line 933, in __call__
  File "/opt/spark/2.0.0/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
  File "/opt/spark/2.0.0/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py", line 
312, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o95.showString.
: java.lang.OutOfMemoryError: Requested array size exceeds VM limit
at java.util.Arrays.copyOf(Arrays.java:2271)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
at 
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
at 
java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
at 
java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1785)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1188)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
at 
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
at 
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2037)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:798)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:797)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:797)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:364)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
at 
org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:240)
at 
org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:323)
at 
org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39)
at 
org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2183)
    at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)

Ben

> On Aug 3, 2016, at 4:00 PM, Ben Teeuwen <bteeu...@gmail.com> wrote:
>

Re: how to debug spark app?

2016-08-04 Thread Ben Teeuwen
Related question: what are good profiling tools other than watching along the 
application master with the running code? 
Are there things that can be logged during the run? If I have say 2 ways of 
accomplishing the same thing, and I want to learn about the time/memory/general 
resource blocking performance of both, what is the best way of doing that? What 
tic, toc does in Matlab, or profile on, profile report.

> On Aug 4, 2016, at 3:19 AM, Sumit Khanna  wrote:
> 
> Am not really sure of the best practices on this , but I either consult the 
> localhost:4040/jobs/ etc 
> or better this :
> 
> val customSparkListener: CustomSparkListener = new CustomSparkListener()
> sc.addSparkListener(customSparkListener)
> class CustomSparkListener extends SparkListener {
>  override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) {
>   debug(s"application ended at time : ${applicationEnd.time}")
>  }
>  override def onApplicationStart(applicationStart: 
> SparkListenerApplicationStart): Unit ={
>   debug(s"[SPARK LISTENER DEBUGS] application Start app attempt id : 
> ${applicationStart.appAttemptId}")
>   debug(s"[SPARK LISTENER DEBUGS] application Start app id : 
> ${applicationStart.appId}")
>   debug(s"[SPARK LISTENER DEBUGS] application start app name : 
> ${applicationStart.appName}")
>   debug(s"[SPARK LISTENER DEBUGS] applicaton start driver logs : 
> ${applicationStart.driverLogs}")
>   debug(s"[SPARK LISTENER DEBUGS] application start spark user : 
> ${applicationStart.sparkUser}")
>   debug(s"[SPARK LISTENER DEBUGS] application start time : 
> ${applicationStart.time}")
>  }
>  override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): 
> Unit = {
>   debug(s"[SPARK LISTENER DEBUGS] ${executorAdded.executorId}")
>   debug(s"[SPARK LISTENER DEBUGS] ${executorAdded.executorInfo}")
>   debug(s"[SPARK LISTENER DEBUGS] ${executorAdded.time}")
>  }
>  override  def onExecutorRemoved(executorRemoved: 
> SparkListenerExecutorRemoved): Unit = {
>   debug(s"[SPARK LISTENER DEBUGS] the executor removed Id : 
> ${executorRemoved.executorId}")
>   debug(s"[SPARK LISTENER DEBUGS] the executor removed reason : 
> ${executorRemoved.reason}")
>   debug(s"[SPARK LISTENER DEBUGS] the executor temoved at time : 
> ${executorRemoved.time}")
>  }
> 
>  override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
>   debug(s"[SPARK LISTENER DEBUGS] job End id : ${jobEnd.jobId}")
>   debug(s"[SPARK LISTENER DEBUGS] job End job Result : ${jobEnd.jobResult}")
>   debug(s"[SPARK LISTENER DEBUGS] job End time : ${jobEnd.time}")
>  }
>  override def onJobStart(jobStart: SparkListenerJobStart) {
>   debug(s"[SPARK LISTENER DEBUGS] Job started with properties 
> ${jobStart.properties}")
>   debug(s"[SPARK LISTENER DEBUGS] Job started with time ${jobStart.time}")
>   debug(s"[SPARK LISTENER DEBUGS] Job started with job id 
> ${jobStart.jobId.toString}")
>   debug(s"[SPARK LISTENER DEBUGS] Job started with stage ids 
> ${jobStart.stageIds.toString()}")
>   debug(s"[SPARK LISTENER DEBUGS] Job started with stages 
> ${jobStart.stageInfos.size} : $jobStart")
>  }
> 
>  override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): 
> Unit = {
>   debug(s"[SPARK LISTENER DEBUGS] Stage ${stageCompleted.stageInfo.stageId} 
> completed with ${stageCompleted.stageInfo.numTasks} tasks.")
>   debug(s"[SPARK LISTENER DEBUGS] Stage details : 
> ${stageCompleted.stageInfo.details.toString}")
>   debug(s"[SPARK LISTENER DEBUGS] Stage completion time : 
> ${stageCompleted.stageInfo.completionTime}")
>   debug(s"[SPARK LISTENER DEBUGS] Stage details : 
> ${stageCompleted.stageInfo.rddInfos.toString()}")
>  }
>  override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): 
> Unit = {
>   debug(s"[SPARK LISTENER DEBUGS] Stage properties : 
> ${stageSubmitted.properties}")
>   debug(s"[SPARK LISTENER DEBUGS] Stage rddInfos : 
> ${stageSubmitted.stageInfo.rddInfos.toString()}")
>   debug(s"[SPARK LISTENER DEBUGS] Stage submission Time : 
> ${stageSubmitted.stageInfo.submissionTime}")
>   debug(s"[SPARK LISTENER DEBUGS] Stage submission details : 
> ${stageSubmitted.stageInfo.details.toString()}")
>  }
>  override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
>   debug(s"[SPARK LISTENER DEBUGS] task ended reason : ${taskEnd.reason}")
>   debug(s"[SPARK LISTENER DEBUGS] task type : ${taskEnd.taskType}")
>   debug(s"[SPARK LISTENER DEBUGS] task Metrics : ${taskEnd.taskMetrics}")
>   debug(s"[SPARK LISTENER DEBUGS] task Info : ${taskEnd.taskInfo}")
>   debug(s"[SPARK LISTENER DEBUGS] task stage Id : ${taskEnd.stageId}")
>   debug(s"[SPARK LISTENER DEBUGS] task stage attempt Id : 
> ${taskEnd.stageAttemptId}")
>   debug(s"[SPARK LISTENER DEBUGS] task ended reason : ${taskEnd.reason}")
>  }
>  override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
>   debug(s"[SPARK LISTENER DEBUGS] stage Attempt id : 
> ${taskStart.stageAttemptId}")
>   debug(s"[SPARK 

OOM with StringIndexer, 800m rows & 56m distinct value column

2016-08-03 Thread Ben Teeuwen
Hi,

I want to one hot encode a column containing 56 million distinct values. My 
dataset is 800m rows + 17 columns.
I first apply a StringIndexer, but it already breaks there giving a OOM java 
heap space error.

I launch my app on YARN with:
/opt/spark/2.0.0/bin/spark-shell --executor-memory 10G --num-executors 128 
--executor-cores 2 --driver-memory 12G --conf spark.driver.maxResultSize=8G

After grabbing the data, I run:

val catInts = Array(“bigfeature”)

val stagesIndex = scala.collection.mutable.ArrayBuffer.empty[StringIndexer]
val stagesOhe = scala.collection.mutable.ArrayBuffer.empty[OneHotEncoder]
for (c <- catInts) {
  println(s"starting with $c")
  val i = new StringIndexer()
.setInputCol(c)
.setOutputCol(s"${c}Index")
  stagesIndex += i

  val o = new OneHotEncoder()
.setDropLast(false)
.setInputCol(s"${c}Index")
.setOutputCol(s"${c}OHE")
  stagesOhe += o
}

println(s"Applying string indexers: fitting")
val pipelined = new Pipeline().setStages(stagesIndex.toArray)
val dfFitted = pipelined.fit(df)


Then, the application master shows a "countByValue at StringIndexer.scala” 
taking 1.8 minutes (so very fast). 
Afterwards, the shell console hangs for a while. What is it doing now? After 
some time, it shows:

scala> val dfFitted = pipelined.fit(df)
  
java.lang.OutOfMemoryError: Java heap space
  at scala.reflect.ManifestFactory$$anon$12.newArray(Manifest.scala:141)
  at scala.reflect.ManifestFactory$$anon$12.newArray(Manifest.scala:139)
  at 
org.apache.spark.util.collection.OpenHashMap$$anonfun$1.apply$mcVI$sp(OpenHashMap.scala:159)
  at org.apache.spark.util.collection.OpenHashSet.rehash(OpenHashSet.scala:230)
  at 
org.apache.spark.util.collection.OpenHashSet.rehashIfNeeded(OpenHashSet.scala:167)
  at 
org.apache.spark.util.collection.OpenHashMap$mcD$sp.update$mcD$sp(OpenHashMap.scala:86)
  at 
org.apache.spark.ml.feature.StringIndexerModel.(StringIndexer.scala:137)
  at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:93)
  at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:66)
  at org.apache.spark.ml.Pipeline$$anonfun$fit$2.apply(Pipeline.scala:149)
  at org.apache.spark.ml.Pipeline$$anonfun$fit$2.apply(Pipeline.scala:145)
  at scala.collection.Iterator$class.foreach(Iterator.scala:893)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
  at 
scala.collection.IterableViewLike$Transformed$class.foreach(IterableViewLike.scala:44)
  at 
scala.collection.SeqViewLike$AbstractTransformed.foreach(SeqViewLike.scala:37)
  at org.apache.spark.ml.Pipeline.fit(Pipeline.scala:145)
  ... 16 elided





Materializing mapWithState .stateSnapshot() after ssc.stop

2016-07-28 Thread Ben Teeuwen
Hi all,

I’ve posted a question regarding sessionizing events using scala and 
mapWithState at 
http://stackoverflow.com/questions/38541958/materialize-mapwithstate-statesnapshots-to-database-for-later-resume-of-spark-st
 
.
 

The context is to be able to pause and resume a streaming app while not losing 
the state information. So I want to save and reload (initialize) the state 
snapshot. Has anyone of you already been able to do this?

Thanks,

Ben