Writing protobuf RDD to parquet

2023-01-20 Thread David Diebold
Hello,

I'm trying to write to parquet some RDD[T] where T is a protobuf message,
in scala.
I am wondering what is the best option to do this, and I would be
interested by your lights.
So far, I see two possibilities:
- use PairRDD method *saveAsNewAPIHadoopFile*, and I guess I need to
call *ParquetOutputFormat.setWriteSupportClass
*and *ProtoParquetOutputFormat.setProtobufClass *before. But in that case,
I'm not sure I have much control on how to partition files in different
folders on file system.
- or convert the RDD to dataframe then use *write.parquet ; *in that case,
I have more control, in case rely on *partitionBy *to arrange the files in
different folders. But I'm not sure there is some built-in way to convert
rdd of protobuf to dataframe in spark ? I would need to rely on this :
https://github.com/saurfang/sparksql-protobuf.

What do you think ?
Kind regards,
David


Question about bucketing and custom partitioners

2022-04-11 Thread David Diebold
Hello,

I have a few questions related to bucketing and custom partitioning in
dataframe api.

I am considering bucketing to perform one-side free shuffle join in
incremental jobs, but there is one thing that I'm not happy with.
Data is likely to grow/skew over time. At some point, i would need to
change amount of buckets which would provoke shuffle.

Instead of this, I would like to use a custom partitioner, that would
replace shuffle by narrow transformation.
That is something that was feasible with RDD developer api. For example, I
could use such partitioning scheme:
partition_id = (nb_partitions-1) * ( hash(column) - Int.minValue) /
(Int.maxValue - Int.minValue)
When I multiply amount of partitions by 2 each new partition depends only
on one partition from parent (=> narrow transformation)

So, here are my questions :

1/ Is it possible to use custom partitioner when saving a dataframe with
bucketing ?
2/ Still with the API dataframe, is it possible to apply custom partitioner
to a dataframe ?
Is it possible to repartition the dataframe with a narrow
transformation like what could be done with RDD ?
Is there some sort of dataframe developer API ? Do you have any
pointers on this ?

Thanks !
David


Re: Question about spark.sql min_by

2022-02-21 Thread David Diebold
Thank you for your answers.
Indeed windowing should help there.
Also, I just realized maybe I can try to create a struct column with both
price and sellerId and apply min() on it, ordering would consider price
first for the ordering (https://stackoverflow.com/a/52669177/2015762)

Cheers!

Le lun. 21 févr. 2022 à 16:12, ayan guha  a écrit :

> Why this can not be done by window function? Or is min by is just a short
> hand?
>
> On Tue, 22 Feb 2022 at 12:42 am, Sean Owen  wrote:
>
>> From the source code, looks like this function was added to pyspark in
>> Spark 3.3, up for release soon. It exists in SQL. You can still use it in
>> SQL with `spark.sql(...)` in Python though, not hard.
>>
>> On Mon, Feb 21, 2022 at 4:01 AM David Diebold 
>> wrote:
>>
>>> Hello all,
>>>
>>> I'm trying to use the spark.sql min_by aggregation function with pyspark.
>>> I'm relying on this distribution of spark : spark-3.2.1-bin-hadoop3.2
>>>
>>> I have a dataframe made of these columns:
>>> - productId : int
>>> - sellerId : int
>>> - price : double
>>>
>>> For each product, I want to get the seller who sells the product for the
>>> cheapest price.
>>>
>>> Naive approach would be to do this, but I would expect two shuffles:
>>>
>>> import spark.sql.functions as F
>>> cheapest_prices_df  =
>>> df.groupby('productId').agg(F.min('price').alias('price'))
>>> cheapest_sellers_df = df.join(cheapest_prices_df, on=['productId',
>>> 'price'])
>>>
>>> I would had loved to do this instead :
>>>
>>> import spark.sql.functions as F
>>> cheapest_sellers_df  = df.groupby('productId').agg(F.min('price'),
>>> F.min_by('sellerId', 'price'))
>>>
>>> Unfortunately min_by does not seem available in pyspark sql functions,
>>> whereas I can see it in the doc :
>>> https://spark.apache.org/docs/latest/api/sql/index.html
>>>
>>> I have managed to use min_by with this approach but it looks slow (maybe
>>> because of temp table creation ?):
>>>
>>> df.createOrReplaceTempView("table")
>>> cheapest_sellers_df = spark.sql("select min_by(sellerId, price)
>>> sellerId, min(price) from table group by productId")
>>>
>>> Is there a way I can rely on min_by directly in groupby ?
>>> Is there some code missing in pyspark wrapper to make min_by visible
>>> somehow ?
>>>
>>> Thank you in advance for your help.
>>>
>>> Cheers
>>> David
>>>
>> --
> Best Regards,
> Ayan Guha
>


Question about spark.sql min_by

2022-02-21 Thread David Diebold
Hello all,

I'm trying to use the spark.sql min_by aggregation function with pyspark.
I'm relying on this distribution of spark : spark-3.2.1-bin-hadoop3.2

I have a dataframe made of these columns:
- productId : int
- sellerId : int
- price : double

For each product, I want to get the seller who sells the product for the
cheapest price.

Naive approach would be to do this, but I would expect two shuffles:

import spark.sql.functions as F
cheapest_prices_df  =
df.groupby('productId').agg(F.min('price').alias('price'))
cheapest_sellers_df = df.join(cheapest_prices_df, on=['productId', 'price'])

I would had loved to do this instead :

import spark.sql.functions as F
cheapest_sellers_df  = df.groupby('productId').agg(F.min('price'),
F.min_by('sellerId', 'price'))

Unfortunately min_by does not seem available in pyspark sql functions,
whereas I can see it in the doc :
https://spark.apache.org/docs/latest/api/sql/index.html

I have managed to use min_by with this approach but it looks slow (maybe
because of temp table creation ?):

df.createOrReplaceTempView("table")
cheapest_sellers_df = spark.sql("select min_by(sellerId, price) sellerId,
min(price) from table group by productId")

Is there a way I can rely on min_by directly in groupby ?
Is there some code missing in pyspark wrapper to make min_by visible
somehow ?

Thank you in advance for your help.

Cheers
David


Re: groupMapReduce

2022-01-14 Thread David Diebold
Hello,

In RDD api, you must be looking for reduceByKey.

Cheers

Le ven. 14 janv. 2022 à 11:56, frakass  a écrit :

> Is there a RDD API which is similar to Scala's groupMapReduce?
> https://blog.genuine.com/2019/11/scalas-groupmap-and-groupmapreduce/
>
> Thank you.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Pyspark debugging best practices

2022-01-03 Thread David Diebold
Hello Andy,

Are you sure you want to perform lots of join operations, and not simple
unions ?
Are you doing inner joins or outer joins ?
Can you provide us with a rough amount of your list size plus each
individual dataset size ?
Have a look at execution plan would help, maybe the high amount of join
operations makes execution plan too complicated at the end of the day ;
checkpointing could help there ?

Cheers,
David


Le jeu. 30 déc. 2021 à 16:56, Andrew Davidson  a
écrit :

> Hi Gourav
>
> I will give databricks a try.
>
> Each data gets loaded into a data frame.
> I select one column from the data frame
> I join the column to the  accumulated joins from previous data frames in
> the list
>
> To debug. I think am gaining to put an action and log statement after each
> join. I do not think it will change the performance. I believe the physical
> plan will be the same how ever hopefully it will shed some light.
>
> At the very least I will know if it making progress or not. And hopefully
> where it is breaking
>
> Happy new year
>
> Andy
>
> On Tue, Dec 28, 2021 at 4:19 AM Gourav Sengupta 
> wrote:
>
>> Hi Andrew,
>>
>> Any chance you might give Databricks a try in GCP?
>>
>> The above transformations look complicated to me, why are you adding
>> dataframes to a list?
>>
>>
>> Regards,
>> Gourav Sengupta
>>
>>
>>
>> On Sun, Dec 26, 2021 at 7:00 PM Andrew Davidson 
>> wrote:
>>
>>> Hi
>>>
>>>
>>>
>>> I am having trouble debugging my driver. It runs correctly on smaller
>>> data set but fails on large ones.  It is very hard to figure out what the
>>> bug is. I suspect it may have something do with the way spark is installed
>>> and configured. I am using google cloud platform dataproc pyspark
>>>
>>>
>>>
>>> The log messages are not helpful. The error message will be something
>>> like
>>> "User application exited with status 1"
>>>
>>>
>>>
>>> And
>>>
>>>
>>>
>>> jsonPayload: {
>>>
>>> class: "server.TThreadPoolServer"
>>>
>>> filename: "hive-server2.log"
>>>
>>> message: "Error occurred during processing of message."
>>>
>>> thread: "HiveServer2-Handler-Pool: Thread-40"
>>>
>>> }
>>>
>>>
>>>
>>> I am able to access the spark history server however it does not capture
>>> anything if the driver crashes. I am unable to figure out how to access
>>> spark web UI.
>>>
>>>
>>>
>>> My driver program looks something like the pseudo code bellow. A long
>>> list of transforms with a single action, (i.e. write) at the end. Adding
>>> log messages is not helpful because of lazy evaluations. I am tempted to
>>> add something like
>>>
>>>
>>>
>>> Logger.warn( “DEBUG df.count():{}”.format( df.count() )” to try and
>>> inline some sort of diagnostic message.
>>>
>>>
>>>
>>> What do you think?
>>>
>>>
>>>
>>> Is there a better way to debug this?
>>>
>>>
>>>
>>> Kind regards
>>>
>>>
>>>
>>> Andy
>>>
>>>
>>>
>>> def run():
>>>
>>> listOfDF = []
>>>
>>> for filePath in listOfFiles:
>>>
>>> df = spark.read.load( filePath, ...)
>>>
>>> listOfDF.append(df)
>>>
>>>
>>>
>>>
>>>
>>> list2OfDF = []
>>>
>>> for df in listOfDF:
>>>
>>> df2 = df.select(  )
>>>
>>> lsit2OfDF.append( df2 )
>>>
>>>
>>>
>>> # will setting list to None free cache?
>>>
>>> # or just driver memory
>>>
>>> listOfDF = None
>>>
>>>
>>>
>>>
>>>
>>> df3 = list2OfDF[0]
>>>
>>>
>>>
>>> for i in range( 1, len(list2OfDF) ):
>>>
>>> df = list2OfDF[i]
>>>
>>> df3 = df3.join(df ...)
>>>
>>>
>>>
>>> # will setting to list to None free cache?
>>>
>>> # or just driver memory
>>>
>>> List2OfDF = None
>>>
>>>
>>>
>>>
>>>
>>> lots of narrow transformations on d3
>>>
>>>
>>>
>>> return df3
>>>
>>>
>>>
>>> def main() :
>>>
>>> df = run()
>>>
>>> df.write()
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>


question about data skew and memory issues

2021-12-14 Thread David Diebold
Hello all,

I was wondering if it possible to encounter out of memory exceptions on
spark executors when doing some aggregation, when a dataset is skewed.
Let's say we have a dataset with two columns:
- key : int
- value : float
And I want to aggregate values by key.
Let's say that we have a tons of key equal to 0.

Is it possible to encounter some out of memory exception after the shuffle ?
My expectation would be that the executor responsible of aggregating the
'0' partition could indeed have some oom exception if it tries to put all
the files of this partition in memory before processing them.
But why would it need to put them in memory when doing in aggregation ? It
looks to me that aggregation can be performed in a stream fashion, so I
would not expect any oom at all..

Thank you in advance for your lights :)
David


Re: Trying to hash cross features with mllib

2021-10-04 Thread David Diebold
Hello Sean,

Thank you for the heads-up !
Interaction transform won't help for my use case as it returns a vector
that I won't be able to hash.
I will definitely dig further into custom transformations though.

Thanks !
David

Le ven. 1 oct. 2021 à 15:49, Sean Owen  a écrit :

> Are you looking for
> https://spark.apache.org/docs/latest/ml-features.html#interaction ?
> That's the closest built in thing I can think of.  Otherwise you can make
> custom transformations.
>
> On Fri, Oct 1, 2021, 8:44 AM David Diebold 
> wrote:
>
>> Hello everyone,
>>
>> In MLLib, I’m trying to rely essentially on pipelines to create features
>> out of the Titanic dataset, and show-case the power of feature hashing. I
>> want to:
>>
>> -  Apply bucketization on some columns (QuantileDiscretizer is
>> fine)
>>
>> -  Then I want to cross all my columns with each other to have
>> cross features.
>>
>> -  Then I would like to hash all of these cross features into a
>> vector.
>>
>> -  Then give it to a logistic regression.
>>
>> Looking at the documentation, it looks like the only way to hash features
>> is the *FeatureHasher* transformation. It takes multiple columns as
>> input, type can be numeric, bool, string (but no vector/array).
>>
>> But now I’m left wondering how I can create my cross-feature columns. I’m
>> looking at a transformation that could take two columns as input, and
>> return a numeric, bool, or string. I didn't manage to find anything that
>> does the job. There are multiple transformations such as VectorAssembler,
>> that operate on vector, but this is not a typeaccepted by the FeatureHasher.
>>
>> Of course, I could try to combine columns directly in my dataframe
>> (before the pipeline kicks-in), but then I would not be able to benefit any
>> more from QuantileDiscretizer and other cool functions.
>>
>>
>> Am I missing something in the transformation api ? Or is my approach to
>> hashing wrong ? Or should we consider to extend the api somehow ?
>>
>>
>>
>> Thank you, kind regards,
>>
>> David
>>
>


Trying to hash cross features with mllib

2021-10-01 Thread David Diebold
Hello everyone,

In MLLib, I’m trying to rely essentially on pipelines to create features
out of the Titanic dataset, and show-case the power of feature hashing. I
want to:

-  Apply bucketization on some columns (QuantileDiscretizer is fine)

-  Then I want to cross all my columns with each other to have
cross features.

-  Then I would like to hash all of these cross features into a
vector.

-  Then give it to a logistic regression.

Looking at the documentation, it looks like the only way to hash features
is the *FeatureHasher* transformation. It takes multiple columns as input,
type can be numeric, bool, string (but no vector/array).

But now I’m left wondering how I can create my cross-feature columns. I’m
looking at a transformation that could take two columns as input, and
return a numeric, bool, or string. I didn't manage to find anything that
does the job. There are multiple transformations such as VectorAssembler,
that operate on vector, but this is not a typeaccepted by the FeatureHasher.

Of course, I could try to combine columns directly in my dataframe (before
the pipeline kicks-in), but then I would not be able to benefit any more
from QuantileDiscretizer and other cool functions.


Am I missing something in the transformation api ? Or is my approach to
hashing wrong ? Or should we consider to extend the api somehow ?



Thank you, kind regards,

David


Re: Performance of PySpark jobs on the Kubernetes cluster

2021-08-11 Thread David Diebold
Hi Mich,

I don't quite understand why the driver node is using so much CPU, but it
may be unrelated to your executors being underused.
About your executors being underused, I would check that your job generated
enough tasks.
Then I would check spark.executor.cores and spark.tasks.cpus parameters to
see if I can give more work to the executors.

Cheers,
David



Le mar. 10 août 2021 à 12:20, Khalid Mammadov  a
écrit :

> Hi Mich
>
> I think you need to check your code.
> If code does not use PySpark API effectively you may get this. I.e. if you
> use pure Python/pandas api rather than Pyspark i.e.
> transform->transform->action. e.g df.select(..).withColumn(...)...count()
>
> Hope this helps to put you on right direction.
>
> Cheers
> Khalid
>
>
>
>
> On Mon, 9 Aug 2021, 20:20 Mich Talebzadeh, 
> wrote:
>
>> Hi,
>>
>> I have a basic question to ask.
>>
>> I am running a Google k8s cluster (AKA GKE) with three nodes each having
>> configuration below
>>
>> e2-standard-2 (2 vCPUs, 8 GB memory)
>>
>>
>> spark-submit is launched from another node (actually a data proc single
>> node that I have just upgraded to e2-custom (4 vCPUs, 8 GB mem). We call
>> this the launch node
>>
>> OK I know that the cluster is not much but Google was complaining about
>> the launch node hitting 100% cpus. So I added two more cpus to it.
>>
>> It appears that despite using k8s as the computational cluster, the
>> burden falls upon the launch node!
>>
>> The cpu utilisation for launch node shown below
>>
>> [image: image.png]
>> The dip is when 2 more cpus were added to  it so it had to reboot. so
>> around %70 usage
>>
>> The combined cpu usage for GKE nodes is shown below:
>>
>> [image: image.png]
>>
>> Never goes above 20%!
>>
>> I can see that the drive and executors as below:
>>
>> k get pods -n spark
>> NAME READY   STATUSRESTARTS
>>  AGE
>> pytest-c958c97b2c52b6ed-driver   1/1 Running   0
>> 69s
>> randomdatabigquery-e68a8a7b2c52f468-exec-1   1/1 Running   0
>> 51s
>> randomdatabigquery-e68a8a7b2c52f468-exec-2   1/1 Running   0
>> 51s
>> randomdatabigquery-e68a8a7b2c52f468-exec-3   0/1 Pending   0
>> 51s
>>
>> It is a PySpark 3.1.1 image using java 8 and pushing random data
>> generated into Google BigQuery data warehouse. The last executor (exec-3)
>> seems to be just pending. The spark-submit is as below:
>>
>> spark-submit --verbose \
>>--properties-file ${property_file} \
>>--master k8s://https://$KUBERNETES_MASTER_IP:443 \
>>--deploy-mode cluster \
>>--name pytest \
>>--conf
>> spark.yarn.appMasterEnv.PYSPARK_PYTHON=./pyspark_venv/bin/python \
>>--py-files $CODE_DIRECTORY/DSBQ.zip \
>>--conf spark.kubernetes.namespace=$NAMESPACE \
>>--conf spark.executor.memory=5000m \
>>--conf spark.network.timeout=300 \
>>--conf spark.executor.instances=3 \
>>--conf spark.kubernetes.driver.limit.cores=1 \
>>--conf spark.driver.cores=1 \
>>--conf spark.executor.cores=1 \
>>--conf spark.executor.memory=2000m \
>>--conf spark.kubernetes.driver.docker.image=${IMAGEGCP} \
>>--conf spark.kubernetes.executor.docker.image=${IMAGEGCP} \
>>--conf spark.kubernetes.container.image=${IMAGEGCP} \
>>--conf
>> spark.kubernetes.authenticate.driver.serviceAccountName=spark-bq \
>>--conf
>> spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" \
>>--conf
>> spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true"
>> \
>>--conf spark.sql.execution.arrow.pyspark.enabled="true" \
>>$CODE_DIRECTORY/${APPLICATION}
>>
>> Aren't the driver and executors running on K8s cluster? So why is the
>> launch node heavily used but k8s cluster is underutilized?
>>
>> Thanks
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>


Missing stack function from SQL functions API

2021-06-14 Thread david . szakallas
I noticed that the stack SQL function is missing from the functions
API. Could we add it?


RE: FW: Email to Spark Org please

2021-04-01 Thread Williams, David (Risk Value Stream)
Classification: Public

Many thanks for the info.  So you wouldn't use sklearn with Spark for large 
datasets but use it with smaller datasets and using hyperopt to build models in 
parallel for hypertuning on Spark?

From: Sean Owen 
Sent: 26 March 2021 13:53
To: Williams, David (Risk Value Stream) 
Cc: user@spark.apache.org
Subject: Re: FW: Email to Spark Org please

-- This email has reached the Bank via an external source --

Right, could also be the case that the overhead of distributing it is just 
dominating.
You wouldn't use sklearn with Spark, just use sklearn at this scale.

What you _can_ use Spark for easily in this case is to distribute parameter 
tuning with something like hyperopt. If you're building hundreds of models, 
those can build in parallel with sklearn, and then use Spark to drive the model 
builds in parallel as part of a process to tune the hyperparams.

On Fri, Mar 26, 2021 at 8:43 AM Williams, David (Risk Value Stream) 
mailto:david.willi...@lloydsbanking.com.invalid>>
 wrote:

Classification: Public

Thanks again Sean.

We did try increasing the partitions but to no avail.  Maybe it's because of 
the low dataset volumes as you say so the overhead is the bottleneck.

If we use sklearn in Spark, we have to make some changes to utilize the 
distributed cluster. So if we get that working in distributed, will we get 
benefits similar to spark ML?

Best Regards,
Dave Williams

From: Sean Owen mailto:sro...@gmail.com>>
Sent: 26 March 2021 13:20
To: Williams, David (Risk Value Stream) 
mailto:david.willi...@lloydsbanking.com>>
Cc: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: FW: Email to Spark Org please

-- This email has reached the Bank via an external source --

Simply because the data set is so small. Anything that's operating entirely in 
memory is faster than something splitting the same data across multiple 
machines, running multiple processes, and incurring all the overhead of sending 
the data and results, combining them, etc.

That said, I suspect that you are not using any parallelism in Spark either. 
You probably have 1 partition, which means at most 1 core is used no matter how 
many are there. Repartition the data set.

On Fri, Mar 26, 2021 at 8:15 AM Williams, David (Risk Value Stream) 
mailto:david.willi...@lloydsbanking.com.invalid>>
 wrote:

Classification: Limited

Many thanks for your response Sean.

Question - why spark is overkill for this and why is sklearn is faster please?  
It's the same algorithm right?

Thanks again,
Dave Williams

From: Sean Owen mailto:sro...@gmail.com>>
Sent: 25 March 2021 16:40
To: Williams, David (Risk Value Stream) 
mailto:david.willi...@lloydsbanking.com>>
Cc: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: FW: Email to Spark Org please

-- This email has reached the Bank via an external source --

Spark is overkill for this problem; use sklearn.
But I'd suspect that you are using just 1 partition for such a small data set, 
and get no parallelism from Spark.
repartition your input to many more partitions, but, it's unlikely to get much 
faster than in-core sklearn for this task.

On Thu, Mar 25, 2021 at 11:39 AM Williams, David (Risk Value Stream) 
mailto:david.willi...@lloydsbanking.com.invalid>>
 wrote:

Classification: Public

Hi Team,

We are trying to utilize ML Gradient Boosting Tree Classification algorithm and 
found the performance of the algorithm is very poor during training.

We would like to see we can improve the performance timings since, it is taking 
2 days for training for a smaller dataset.

Our dataset size is 4. Number of features used for training is 564.

The same dataset when we use in Sklearn python training is completed in 3 hours 
but when used ML Gradient Boosting it is taking 2 days.

We tried increasing number of executors, executor cores, driver memory etc but 
couldn't see any improvements.

The following are the parameters used for training.

gbt = GBTClassifier(featuresCol='features', labelCol='bad_flag', 
predictionCol='prediction', maxDepth=11,  maxIter=1, stepSize=0.01, 
subsamplingRate=0.5, minInstancesPerNode=110)

If you could help us with any suggestions to tune this,  that will be really 
helpful

Many thanks,
Dave Williams

Lloyds Banking Group plc. Registered Office: The Mound, Edinburgh EH1 1YZ. 
Registered in Scotland no. SC95000. Telephone: 0131 225 4555.

Lloyds Bank plc. Registered Office: 25 Gresham Street, London EC2V 7HN. 
Registered in England and Wales no. 2065. Telephone 0207626 1500.

Bank of Scotland plc. Registered Office: The Mound, Edinburgh EH1 1YZ. 
Registered in Scotland no. SC327000. Telephone: 03457 801 801.

Lloyds Bank Corporate Markets plc. Registered office: 25 Gresham Street, London 
EC2V 7HN. Registered in England and Wales no. 10399850.

Scottish Widows Schroder Personal Wealth Limited. Registered Office: 25 Gresham 
Street, London EC2V 7HN. Registered in 

RE: FW: Email to Spark Org please

2021-03-26 Thread Williams, David (Risk Value Stream)
Classification: Public

Thanks again Sean.

We did try increasing the partitions but to no avail.  Maybe it's because of 
the low dataset volumes as you say so the overhead is the bottleneck.

If we use sklearn in Spark, we have to make some changes to utilize the 
distributed cluster. So if we get that working in distributed, will we get 
benefits similar to spark ML?

Best Regards,
Dave Williams

From: Sean Owen 
Sent: 26 March 2021 13:20
To: Williams, David (Risk Value Stream) 
Cc: user@spark.apache.org
Subject: Re: FW: Email to Spark Org please

-- This email has reached the Bank via an external source --

Simply because the data set is so small. Anything that's operating entirely in 
memory is faster than something splitting the same data across multiple 
machines, running multiple processes, and incurring all the overhead of sending 
the data and results, combining them, etc.

That said, I suspect that you are not using any parallelism in Spark either. 
You probably have 1 partition, which means at most 1 core is used no matter how 
many are there. Repartition the data set.

On Fri, Mar 26, 2021 at 8:15 AM Williams, David (Risk Value Stream) 
mailto:david.willi...@lloydsbanking.com.invalid>>
 wrote:

Classification: Limited

Many thanks for your response Sean.

Question - why spark is overkill for this and why is sklearn is faster please?  
It's the same algorithm right?

Thanks again,
Dave Williams

From: Sean Owen mailto:sro...@gmail.com>>
Sent: 25 March 2021 16:40
To: Williams, David (Risk Value Stream) 
mailto:david.willi...@lloydsbanking.com>>
Cc: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: FW: Email to Spark Org please

-- This email has reached the Bank via an external source --

Spark is overkill for this problem; use sklearn.
But I'd suspect that you are using just 1 partition for such a small data set, 
and get no parallelism from Spark.
repartition your input to many more partitions, but, it's unlikely to get much 
faster than in-core sklearn for this task.

On Thu, Mar 25, 2021 at 11:39 AM Williams, David (Risk Value Stream) 
mailto:david.willi...@lloydsbanking.com.invalid>>
 wrote:

Classification: Public

Hi Team,

We are trying to utilize ML Gradient Boosting Tree Classification algorithm and 
found the performance of the algorithm is very poor during training.

We would like to see we can improve the performance timings since, it is taking 
2 days for training for a smaller dataset.

Our dataset size is 4. Number of features used for training is 564.

The same dataset when we use in Sklearn python training is completed in 3 hours 
but when used ML Gradient Boosting it is taking 2 days.

We tried increasing number of executors, executor cores, driver memory etc but 
couldn't see any improvements.

The following are the parameters used for training.

gbt = GBTClassifier(featuresCol='features', labelCol='bad_flag', 
predictionCol='prediction', maxDepth=11,  maxIter=1, stepSize=0.01, 
subsamplingRate=0.5, minInstancesPerNode=110)

If you could help us with any suggestions to tune this,  that will be really 
helpful

Many thanks,
Dave Williams
Lloyds Banking Group plc. Registered Office: The Mound, Edinburgh EH1 1YZ. 
Registered in Scotland no. SC95000. Telephone: 0131 225 4555.

Lloyds Bank plc. Registered Office: 25 Gresham Street, London EC2V 7HN. 
Registered in England and Wales no. 2065. Telephone 0207626 1500.

Bank of Scotland plc. Registered Office: The Mound, Edinburgh EH1 1YZ. 
Registered in Scotland no. SC327000. Telephone: 03457 801 801.

Lloyds Bank Corporate Markets plc. Registered office: 25 Gresham Street, London 
EC2V 7HN. Registered in England and Wales no. 10399850.

Scottish Widows Schroder Personal Wealth Limited. Registered Office: 25 Gresham 
Street, London EC2V 7HN. Registered in England and Wales no. 11722983.

Lloyds Bank plc, Bank of Scotland plc and Lloyds Bank Corporate Markets plc are 
authorised by the Prudential Regulation Authority and regulated by the 
Financial Conduct Authority and Prudential Regulation Authority.

Scottish Widows Schroder Personal Wealth Limited is authorised and regulated by 
the Financial Conduct Authority.

Lloyds Bank Corporate Markets Wertpapierhandelsbank GmbH is a wholly-owned 
subsidiary of Lloyds Bank Corporate Markets plc. Lloyds Bank Corporate Markets 
Wertpapierhandelsbank GmbH has its registered office at Thurn-und-Taxis Platz 
6, 60313 Frankfurt, Germany. The company is registered with the Amtsgericht 
Frankfurt am Main, HRB 111650. Lloyds Bank Corporate Markets 
Wertpapierhandelsbank GmbH is supervised by the Bundesanstalt für 
Finanzdienstleistungsaufsicht.

Halifax is a division of Bank of Scotland plc.

HBOS plc. Registered Office: The Mound, Edinburgh EH1 1YZ. Registered in 
Scotland no. SC218813.



This e-mail (including any attachments) is private and confidential and may 
contain privileged material. If you have received this e-mail in error, please

RE: FW: Email to Spark Org please

2021-03-26 Thread Williams, David (Risk Value Stream)
Classification: Limited

Many thanks for your response Sean.

Question - why spark is overkill for this and why is sklearn is faster please?  
It's the same algorithm right?

Thanks again,
Dave Williams

From: Sean Owen mailto:sro...@gmail.com>>
Sent: 25 March 2021 16:40
To: Williams, David (Risk Value Stream) 
mailto:david.willi...@lloydsbanking.com>>
Cc: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: FW: Email to Spark Org please

-- This email has reached the Bank via an external source --

Spark is overkill for this problem; use sklearn.
But I'd suspect that you are using just 1 partition for such a small data set, 
and get no parallelism from Spark.
repartition your input to many more partitions, but, it's unlikely to get much 
faster than in-core sklearn for this task.

On Thu, Mar 25, 2021 at 11:39 AM Williams, David (Risk Value Stream) 
mailto:david.willi...@lloydsbanking.com.invalid>>
 wrote:

Classification: Public

Hi Team,

We are trying to utilize ML Gradient Boosting Tree Classification algorithm and 
found the performance of the algorithm is very poor during training.

We would like to see we can improve the performance timings since, it is taking 
2 days for training for a smaller dataset.

Our dataset size is 4. Number of features used for training is 564.

The same dataset when we use in Sklearn python training is completed in 3 hours 
but when used ML Gradient Boosting it is taking 2 days.

We tried increasing number of executors, executor cores, driver memory etc but 
couldn't see any improvements.

The following are the parameters used for training.

gbt = GBTClassifier(featuresCol='features', labelCol='bad_flag', 
predictionCol='prediction', maxDepth=11,  maxIter=1, stepSize=0.01, 
subsamplingRate=0.5, minInstancesPerNode=110)

If you could help us with any suggestions to tune this,  that will be really 
helpful

Many thanks,
Dave Williams
Lloyds Banking Group plc. Registered Office: The Mound, Edinburgh EH1 1YZ. 
Registered in Scotland no. SC95000. Telephone: 0131 225 4555.

Lloyds Bank plc. Registered Office: 25 Gresham Street, London EC2V 7HN. 
Registered in England and Wales no. 2065. Telephone 0207626 1500.

Bank of Scotland plc. Registered Office: The Mound, Edinburgh EH1 1YZ. 
Registered in Scotland no. SC327000. Telephone: 03457 801 801.

Lloyds Bank Corporate Markets plc. Registered office: 25 Gresham Street, London 
EC2V 7HN. Registered in England and Wales no. 10399850.

Scottish Widows Schroder Personal Wealth Limited. Registered Office: 25 Gresham 
Street, London EC2V 7HN. Registered in England and Wales no. 11722983.

Lloyds Bank plc, Bank of Scotland plc and Lloyds Bank Corporate Markets plc are 
authorised by the Prudential Regulation Authority and regulated by the 
Financial Conduct Authority and Prudential Regulation Authority.

Scottish Widows Schroder Personal Wealth Limited is authorised and regulated by 
the Financial Conduct Authority.

Lloyds Bank Corporate Markets Wertpapierhandelsbank GmbH is a wholly-owned 
subsidiary of Lloyds Bank Corporate Markets plc. Lloyds Bank Corporate Markets 
Wertpapierhandelsbank GmbH has its registered office at Thurn-und-Taxis Platz 
6, 60313 Frankfurt, Germany. The company is registered with the Amtsgericht 
Frankfurt am Main, HRB 111650. Lloyds Bank Corporate Markets 
Wertpapierhandelsbank GmbH is supervised by the Bundesanstalt für 
Finanzdienstleistungsaufsicht.

Halifax is a division of Bank of Scotland plc.

HBOS plc. Registered Office: The Mound, Edinburgh EH1 1YZ. Registered in 
Scotland no. SC218813.



This e-mail (including any attachments) is private and confidential and may 
contain privileged material. If you have received this e-mail in error, please 
notify the sender and delete it (including any attachments) immediately. You 
must not copy, distribute, disclose or use any of the information in it or any 
attachments. Telephone calls may be monitored or recorded.


FW: Email to Spark Org please

2021-03-25 Thread Williams, David (Risk Value Stream)
Classification: Public

Hi Team,

We are trying to utilize ML Gradient Boosting Tree Classification algorithm and 
found the performance of the algorithm is very poor during training.

We would like to see we can improve the performance timings since, it is taking 
2 days for training for a smaller dataset.

Our dataset size is 4. Number of features used for training is 564.

The same dataset when we use in Sklearn python training is completed in 3 hours 
but when used ML Gradient Boosting it is taking 2 days.

We tried increasing number of executors, executor cores, driver memory etc but 
couldn't see any improvements.

The following are the parameters used for training.

gbt = GBTClassifier(featuresCol='features', labelCol='bad_flag', 
predictionCol='prediction', maxDepth=11,  maxIter=1, stepSize=0.01, 
subsamplingRate=0.5, minInstancesPerNode=110)

If you could help us with any suggestions to tune this,  that will be really 
helpful

Many thanks,
Dave Williams

Lloyds Banking Group plc. Registered Office: The Mound, Edinburgh EH1 1YZ. 
Registered in Scotland no. SC95000. Telephone: 0131 225 4555.

Lloyds Bank plc. Registered Office: 25 Gresham Street, London EC2V 7HN. 
Registered in England and Wales no. 2065. Telephone 0207626 1500.

Bank of Scotland plc. Registered Office: The Mound, Edinburgh EH1 1YZ. 
Registered in Scotland no. SC327000. Telephone: 03457 801 801.

Lloyds Bank Corporate Markets plc. Registered office: 25 Gresham Street, London 
EC2V 7HN. Registered in England and Wales no. 10399850.

Scottish Widows Schroder Personal Wealth Limited. Registered Office: 25 Gresham 
Street, London EC2V 7HN. Registered in England and Wales no. 11722983.

Lloyds Bank plc, Bank of Scotland plc and Lloyds Bank Corporate Markets plc are 
authorised by the Prudential Regulation Authority and regulated by the 
Financial Conduct Authority and Prudential Regulation Authority.

Scottish Widows Schroder Personal Wealth Limited is authorised and regulated by 
the Financial Conduct Authority.

Lloyds Bank Corporate Markets Wertpapierhandelsbank GmbH is a wholly-owned 
subsidiary of Lloyds Bank Corporate Markets plc. Lloyds Bank Corporate Markets 
Wertpapierhandelsbank GmbH has its registered office at Thurn-und-Taxis Platz 
6, 60313 Frankfurt, Germany. The company is registered with the Amtsgericht 
Frankfurt am Main, HRB 111650. Lloyds Bank Corporate Markets 
Wertpapierhandelsbank GmbH is supervised by the Bundesanstalt für 
Finanzdienstleistungsaufsicht.

Halifax is a division of Bank of Scotland plc.

HBOS plc. Registered Office: The Mound, Edinburgh EH1 1YZ. Registered in 
Scotland no. SC218813.



This e-mail (including any attachments) is private and confidential and may 
contain privileged material. If you have received this e-mail in error, please 
notify the sender and delete it (including any attachments) immediately. You 
must not copy, distribute, disclose or use any of the information in it or any 
attachments. Telephone calls may be monitored or recorded.


Re: S3a Committer

2021-02-02 Thread David Morin
Yes, that's true but this is not (yet) the case of the Openstack Swift S3
API

Le mar. 2 févr. 2021 à 21:41, Henoc  a écrit :

> S3 is strongly consistent now
> https://aws.amazon.com/s3/consistency/
>
> Regards,
> Henoc
>
> On Tue, Feb 2, 2021, 10:27 PM David Morin 
> wrote:
>
>> Hi,
>>
>> I have some issues at the moment with S3 API of Openstack Swift (S3a).
>> This one is eventually consistent and it causes lots of issues with my
>> distributed jobs in Spark.
>> Is the S3A committer able to fix that ? Or an "S3guard like"
>> implementation is the only way ?
>>
>> David
>>
>


S3a Committer

2021-02-02 Thread David Morin
Hi,

I have some issues at the moment with S3 API of Openstack Swift (S3a).
This one is eventually consistent and it causes lots of issues with my
distributed jobs in Spark.
Is the S3A committer able to fix that ? Or an "S3guard like" implementation
is the only way ?

David


Re: Spark 3.0.1 Structured streaming - checkpoints fail

2020-12-23 Thread David Morin
Thanks Jungtaek
Ok I got it. I'll test it and check if the loss of efficiency is acceptable.


Le mer. 23 déc. 2020 à 23:29, Jungtaek Lim  a
écrit :

> Please refer my previous answer -
> https://lists.apache.org/thread.html/r7dfc9e47cd9651fb974f97dde756013fd0b90e49d4f6382d7a3d68f7%40%3Cuser.spark.apache.org%3E
> Probably we may want to add it in the SS guide doc. We didn't need it as
> it just didn't work with eventually consistent model, and now it works
> anyway but is very inefficient.
>
>
> On Thu, Dec 24, 2020 at 6:16 AM David Morin 
> wrote:
>
>> Does it work with the standard AWS S3 solution and its new
>> consistency model
>> <https://aws.amazon.com/blogs/aws/amazon-s3-update-strong-read-after-write-consistency/>
>> ?
>>
>> Le mer. 23 déc. 2020 à 18:48, David Morin  a
>> écrit :
>>
>>> Thanks.
>>> My Spark applications run on nodes based on docker images but this is a
>>> standalone mode (1 driver - n workers)
>>> Can we use S3 directly with consistency addon like s3guard (s3a) or AWS
>>> Consistent view
>>> <https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-consistent-view.html>
>>>  ?
>>>
>>> Le mer. 23 déc. 2020 à 17:48, Lalwani, Jayesh  a
>>> écrit :
>>>
>>>> Yes. It is necessary to have a distributed file system because all the
>>>> workers need to read/write to the checkpoint. The distributed file system
>>>> has to be immediately consistent: When one node writes to it, the other
>>>> nodes should be able to read it immediately
>>>>
>>>> The solutions/workarounds depend on where you are hosting your Spark
>>>> application.
>>>>
>>>>
>>>>
>>>> *From: *David Morin 
>>>> *Date: *Wednesday, December 23, 2020 at 11:08 AM
>>>> *To: *"user@spark.apache.org" 
>>>> *Subject: *[EXTERNAL] Spark 3.0.1 Structured streaming - checkpoints
>>>> fail
>>>>
>>>>
>>>>
>>>> *CAUTION*: This email originated from outside of the organization. Do
>>>> not click links or open attachments unless you can confirm the sender and
>>>> know the content is safe.
>>>>
>>>>
>>>>
>>>> Hello,
>>>>
>>>>
>>>>
>>>> I have an issue with my Pyspark job related to checkpoint.
>>>>
>>>>
>>>>
>>>> Caused by: org.apache.spark.SparkException: Job aborted due to stage
>>>> failure: Task 3 in stage 16997.0 failed 4 times, most recent failure: Lost
>>>> task 3.3 in stage 16997.0 (TID 206609, 10.XXX, executor 4):
>>>> java.lang.IllegalStateException: Error reading delta file
>>>> file:/opt/spark/workdir/query6/checkpointlocation/state/0/3/1.delta of
>>>> HDFSStateStoreProvider[id = (op=0,part=3),dir =
>>>> file:/opt/spark/workdir/query6/checkpointlocation/state/0/3]: 
>>>> *file:/opt/spark/workdir/query6/checkpointlocation/state/0/3/1.delta
>>>> does not exist*
>>>>
>>>>
>>>>
>>>> This job is based on Spark 3.0.1 and Structured Streaming
>>>>
>>>> This Spark cluster (1 driver and 6 executors) works without hdfs. And
>>>> we don't want to manage an hdfs cluster if possible.
>>>>
>>>> Is it necessary to have a distributed filesystem ? What are the
>>>> different solutions/workarounds ?
>>>>
>>>>
>>>>
>>>> Thanks in advance
>>>>
>>>> David
>>>>
>>>


Re: Spark 3.0.1 Structured streaming - checkpoints fail

2020-12-23 Thread David Morin
Does it work with the standard AWS S3 solution and its new consistency model
<https://aws.amazon.com/blogs/aws/amazon-s3-update-strong-read-after-write-consistency/>
?

Le mer. 23 déc. 2020 à 18:48, David Morin  a
écrit :

> Thanks.
> My Spark applications run on nodes based on docker images but this is a
> standalone mode (1 driver - n workers)
> Can we use S3 directly with consistency addon like s3guard (s3a) or AWS
> Consistent view
> <https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-consistent-view.html>
>  ?
>
> Le mer. 23 déc. 2020 à 17:48, Lalwani, Jayesh  a
> écrit :
>
>> Yes. It is necessary to have a distributed file system because all the
>> workers need to read/write to the checkpoint. The distributed file system
>> has to be immediately consistent: When one node writes to it, the other
>> nodes should be able to read it immediately
>>
>> The solutions/workarounds depend on where you are hosting your Spark
>> application.
>>
>>
>>
>> *From: *David Morin 
>> *Date: *Wednesday, December 23, 2020 at 11:08 AM
>> *To: *"user@spark.apache.org" 
>> *Subject: *[EXTERNAL] Spark 3.0.1 Structured streaming - checkpoints fail
>>
>>
>>
>> *CAUTION*: This email originated from outside of the organization. Do
>> not click links or open attachments unless you can confirm the sender and
>> know the content is safe.
>>
>>
>>
>> Hello,
>>
>>
>>
>> I have an issue with my Pyspark job related to checkpoint.
>>
>>
>>
>> Caused by: org.apache.spark.SparkException: Job aborted due to stage
>> failure: Task 3 in stage 16997.0 failed 4 times, most recent failure: Lost
>> task 3.3 in stage 16997.0 (TID 206609, 10.XXX, executor 4):
>> java.lang.IllegalStateException: Error reading delta file
>> file:/opt/spark/workdir/query6/checkpointlocation/state/0/3/1.delta of
>> HDFSStateStoreProvider[id = (op=0,part=3),dir =
>> file:/opt/spark/workdir/query6/checkpointlocation/state/0/3]: 
>> *file:/opt/spark/workdir/query6/checkpointlocation/state/0/3/1.delta
>> does not exist*
>>
>>
>>
>> This job is based on Spark 3.0.1 and Structured Streaming
>>
>> This Spark cluster (1 driver and 6 executors) works without hdfs. And we
>> don't want to manage an hdfs cluster if possible.
>>
>> Is it necessary to have a distributed filesystem ? What are the different
>> solutions/workarounds ?
>>
>>
>>
>> Thanks in advance
>>
>> David
>>
>


Re: Spark 3.0.1 Structured streaming - checkpoints fail

2020-12-23 Thread David Morin
Thanks.
My Spark applications run on nodes based on docker images but this is a
standalone mode (1 driver - n workers)
Can we use S3 directly with consistency addon like s3guard (s3a) or AWS
Consistent view
<https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-consistent-view.html>
 ?

Le mer. 23 déc. 2020 à 17:48, Lalwani, Jayesh  a
écrit :

> Yes. It is necessary to have a distributed file system because all the
> workers need to read/write to the checkpoint. The distributed file system
> has to be immediately consistent: When one node writes to it, the other
> nodes should be able to read it immediately
>
> The solutions/workarounds depend on where you are hosting your Spark
> application.
>
>
>
> *From: *David Morin 
> *Date: *Wednesday, December 23, 2020 at 11:08 AM
> *To: *"user@spark.apache.org" 
> *Subject: *[EXTERNAL] Spark 3.0.1 Structured streaming - checkpoints fail
>
>
>
> *CAUTION*: This email originated from outside of the organization. Do not
> click links or open attachments unless you can confirm the sender and know
> the content is safe.
>
>
>
> Hello,
>
>
>
> I have an issue with my Pyspark job related to checkpoint.
>
>
>
> Caused by: org.apache.spark.SparkException: Job aborted due to stage
> failure: Task 3 in stage 16997.0 failed 4 times, most recent failure: Lost
> task 3.3 in stage 16997.0 (TID 206609, 10.XXX, executor 4):
> java.lang.IllegalStateException: Error reading delta file
> file:/opt/spark/workdir/query6/checkpointlocation/state/0/3/1.delta of
> HDFSStateStoreProvider[id = (op=0,part=3),dir =
> file:/opt/spark/workdir/query6/checkpointlocation/state/0/3]: 
> *file:/opt/spark/workdir/query6/checkpointlocation/state/0/3/1.delta
> does not exist*
>
>
>
> This job is based on Spark 3.0.1 and Structured Streaming
>
> This Spark cluster (1 driver and 6 executors) works without hdfs. And we
> don't want to manage an hdfs cluster if possible.
>
> Is it necessary to have a distributed filesystem ? What are the different
> solutions/workarounds ?
>
>
>
> Thanks in advance
>
> David
>


Spark 3.0.1 Structured streaming - checkpoints fail

2020-12-23 Thread David Morin
Hello,

I have an issue with my Pyspark job related to checkpoint.

Caused by: org.apache.spark.SparkException: Job aborted due to stage
failure: Task 3 in stage 16997.0 failed 4 times, most recent failure: Lost
task 3.3 in stage 16997.0 (TID 206609, 10.XXX, executor 4):
java.lang.IllegalStateException: Error reading delta file
file:/opt/spark/workdir/query6/checkpointlocation/state/0/3/1.delta of
HDFSStateStoreProvider[id = (op=0,part=3),dir =
file:/opt/spark/workdir/query6/checkpointlocation/state/0/3]:
*file:/opt/spark/workdir/query6/checkpointlocation/state/0/3/1.delta
does not exist*

This job is based on Spark 3.0.1 and Structured Streaming
This Spark cluster (1 driver and 6 executors) works without hdfs. And we
don't want to manage an hdfs cluster if possible.
Is it necessary to have a distributed filesystem ? What are the different
solutions/workarounds ?

Thanks in advance
David


Re: Job is not able to perform Broadcast Join

2020-10-06 Thread David Edwards
After adding the sequential ids you might need a repartition? I've found
using monotically increasing id before that the df goes to a single
partition. Usually becomes clear in the spark ui though

On Tue, 6 Oct 2020, 20:38 Sachit Murarka,  wrote:

> Yes, Even I tried the same first. Then I moved to join method because
> shuffle spill was happening because row num without partition happens on
> single task. Instead of processinf entire dataframe on single task. I have
> broken down that into df1 and df2 and joining.
> Because df2 is having very less data set since it has 2 cols only.
>
> Thanks
> Sachit
>
> On Wed, 7 Oct 2020, 01:04 Eve Liao,  wrote:
>
>> Try to avoid broadcast. Thought this:
>> https://towardsdatascience.com/adding-sequential-ids-to-a-spark-dataframe-fa0df5566ff6
>> could be helpful.
>>
>> On Tue, Oct 6, 2020 at 12:18 PM Sachit Murarka 
>> wrote:
>>
>>> Thanks Eve for response.
>>>
>>> Yes I know we can use broadcast for smaller datasets,I increased the
>>> threshold (4Gb) for the same then also it did not work. and the df3 is
>>> somewhat greater than 2gb.
>>>
>>> Trying by removing broadcast as well.. Job is running since 1 hour. Will
>>> let you know.
>>>
>>>
>>> Thanks
>>> Sachit
>>>
>>> On Wed, 7 Oct 2020, 00:41 Eve Liao,  wrote:
>>>
 How many rows does df3 have? Broadcast joins are a great way to append
 data stored in relatively *small* single source of truth data files to
 large DataFrames. DataFrames up to 2GB can be broadcasted so a data file
 with tens or even hundreds of thousands of rows is a broadcast candidate.
 Your broadcast variable is probably too large.

 On Tue, Oct 6, 2020 at 11:37 AM Sachit Murarka 
 wrote:

> Hello Users,
>
> I am facing an issue in spark job where I am doing row number()
> without partition by clause because I need to add sequential increasing 
> IDs.
> But to avoid the large spill I am not doing row number() over the
> complete data frame.
>
> Instead I am applying monotically_increasing id on actual data set ,
> then create a new data frame from original data frame which will have
> just monotically_increasing id.
>
> So DF1 = All columns + monotically_increasing_id
> DF2 = Monotically_increasingID
>
> Now I am applying row number() on DF2 since this is a smaller
> dataframe.
>
> DF3 = Monotically_increasingID + Row_Number_ID
>
> Df.join(broadcast(DF3))
>
> This will give me sequential increment id in the original Dataframe.
>
> But below is the stack trace.
>
> py4j.protocol.Py4JJavaError: An error occurred while calling
> o180.parquet.
> : org.apache.spark.SparkException: Job aborted.
> at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
> at
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
> at
> org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
> at
> org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
> at
> org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
> at
> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
> at
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
> at
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
> at
> org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
> at
> org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
> at
> org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
> at
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
> at
> org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
> at
> 

unsubscribe

2020-04-26 Thread David Aspegren
unsubscribe


Re: wot no toggle ?

2020-04-16 Thread David Hesson
You may want to read about the JVM and have some degree of understanding
what you're talking about, and then you'd know that those options have
different meanings. You can view both at the same time, for example.

On Thu, Apr 16, 2020, 2:13 AM jane thorpe 
wrote:

> https://spark.apache.org/docs/3.0.0-preview/web-ui.html#storage-tab
>
> On the link in one of the screen shot there are two  checkboxes.
> ON HEAP MEMORY
> OFF HEAP MEMORY.
>
> That is as useful as a pussy on as Barry Humphries wearing a gold dress
> as Dame Edna average.
>
> Which monkey came up with that ?
> None of the moneys here noticed that ?
>
> Ever heard of a toggle switch.
> look behind you.
> Look at the light switch.
> That is a Toggle switch ON/OFF.
>
>
> Jane thorpe
> janethor...@aol.com
>


Re: Going it alone.

2020-04-14 Thread David Hesson
>
> I want to know  if Spark is headed in my direction.
>
You are implying  Spark could be.


What direction are you headed in, exactly? I don't feel as if anything were
implied when you were asked for use cases or what problem you are solving.
You were asked to identify some use cases, of which you don't appear to
have any.

On Tue, Apr 14, 2020 at 4:49 PM jane thorpe 
wrote:

> That's what  I want to know,  Use Cases.
> I am looking for  direction as I described and I want to know  if Spark is
> headed in my direction.
>
> You are implying  Spark could be.
>
> So tell me about the USE CASES and I'll do the rest.
> --
> On Tuesday, 14 April 2020 yeikel valdes  wrote:
> It depends on your use case. What are you trying to solve?
>
>
>  On Tue, 14 Apr 2020 15:36:50 -0400 * janethor...@aol.com.INVALID *
> wrote 
>
> Hi,
>
> I consider myself to be quite good in Software Development especially
> using frameworks.
>
> I like to get my hands  dirty. I have spent the last few months
> understanding modern frameworks and architectures.
>
> I am looking to invest my energy in a product where I don't have to
> relying on the monkeys which occupy this space  we call software
> development.
>
> I have found one that meets my requirements.
>
> Would Apache Spark be a good Tool for me or  do I need to be a member of a
> team to develop  products  using Apache Spark  ?
>
>
>
>
>
>


Re: Questions about count() performance with dataframes and parquet files

2020-02-12 Thread David Edwards
Hi ashley,

Apologies reading this on my phone as work l laptop doesn't let me access
personal email.

Are you actually doing anything with the counts (printing to log, writing
to table?)

If you're not doing anything with them get rid of them and the caches
entirely.

If you do want to do something with the counts you could try removing the
individual counts and caches.

Put a single cache on the df_output

df_output = df_inserts.union(df_updates).cache()

Then output a count group by type on this df before  writing out the parquet.

Hope that helps

Dave


On Thu, 13 Feb 2020, 06:09 Ashley Hoff,  wrote:

> Thanks David,
>
> I did experiment with the .cache() keyword and have to admit I didn't see
> any marked improvement on the sample that I was running, so yes I am a bit
> apprehensive including it (not even sure why I actually left it in).
>
> When you say "do the count as the final step", are you referring to
> getting the counts of the individual data frames, or from the already
> outputted parquet?
>
> Thanks and I appreciate your reply
>
> On Thu, Feb 13, 2020 at 4:15 PM David Edwards 
> wrote:
>
>> Hi Ashley,
>>
>> I'm not an expert but think this is because spark does lazy execution and
>> doesn't actually perform any actions until you do some kind of write, count
>> or other operation on the dataframe.
>>
>> If you remove the count steps it will work out a more efficient execution
>> plan reducing the number of task steps.
>>
>> if you can do the count as a final step I would do that. I think you may
>> also not need the .cache() statements and you might want to experiment
>> reducing the number spark.sql.shuffle.partitions too.
>>
>> Thanks
>> Dave
>>
>>
>>
>>
>>
>>
>>
>>
>> On Thu, 13 Feb 2020, 04:09 Ashley Hoff,  wrote:
>>
>>> Hi,
>>>
>>> I am currently working on an app using PySpark to produce an insert and
>>> update daily delta capture, being outputted as Parquet.  This is running on
>>> a 8 core 32 GB Linux server in standalone mode (set to 6 worker cores of
>>> 2GB memory each) running Spark 2.4.3.
>>>
>>> This is being achieved by reading in data from a TSQL database, into a
>>> dataframe, which has a hash of all records appended to it and comparing it
>>> to a dataframe from yesterdays data (which has been saved also as
>>> parquet).
>>>
>>> As part of the monitoring and logging, I am trying to count the number
>>> of records for the respective actions.  Example code:
>>>
>>> df_source = spark_session.read.format('jdbc').
>>> df_reference = sql_context.read.parquet('/path/to/reference.parquet')
>>>
>>> df_source_hashed = df_source.withColumn('hashkey', md5(concat_ws('', 
>>> *df_source.columns))) \
>>> .cache()
>>>
>>> df_inserts = df_source_hashed.join(df_reference, pk_list, how='left_anti') \
>>> .select(lit('Insert').alias('_action'), 
>>> *df_source_hashed) \
>>> .dropDuplicates() \
>>> .cache()
>>> inserts_count = df_inserts.count()
>>>
>>> df_updates = df_source_hashed.alias('a').join(df_reference.alias('b'), 
>>> pk_list, how="inner") \
>>> .select(lit('Update').alias('_action'), 
>>> *df_source_hashed) \
>>> .where(col('a.hashkey') != col('b.hashkey')) \
>>> .dropDuplicates() \
>>> .cache()
>>> updates_count = df_updates.count()
>>>
>>> df_output = df_inserts.union(df_updates)
>>>
>>> df_output.repartition(1).write.format('parquet').mode('overwrite').save('/path/to/output.parquet')
>>>
>>> The above code is running two occurrences concurrently via Python
>>> threading.Thread (this is to try and overcome the network bottle neck
>>> connecting to the database server).
>>>
>>> What I am finding is I am getting some very inconsistent behavior with
>>> the counts.  Occasionally, it appears that it will freeze up on a count
>>> operation for a few minutes and quite often that specific data frame will
>>> have zero records in it.  According to the DAG (which I am not 100% sure
>>> how to read) the following is the processing flow:
>>>
>>> Exchange/ShuffledRowRDD [74]count at NativeMethodAccessorImpl.java:0  =>
>>> WholeStageCodegen/MapPartitionsRDD [75]count at
>>> NativeMethodAccessorImpl.java:0  => InMemo

Re: Questions about count() performance with dataframes and parquet files

2020-02-12 Thread David Edwards
Hi Ashley,

I'm not an expert but think this is because spark does lazy execution and
doesn't actually perform any actions until you do some kind of write, count
or other operation on the dataframe.

If you remove the count steps it will work out a more efficient execution
plan reducing the number of task steps.

if you can do the count as a final step I would do that. I think you may
also not need the .cache() statements and you might want to experiment
reducing the number spark.sql.shuffle.partitions too.

Thanks
Dave








On Thu, 13 Feb 2020, 04:09 Ashley Hoff,  wrote:

> Hi,
>
> I am currently working on an app using PySpark to produce an insert and
> update daily delta capture, being outputted as Parquet.  This is running on
> a 8 core 32 GB Linux server in standalone mode (set to 6 worker cores of
> 2GB memory each) running Spark 2.4.3.
>
> This is being achieved by reading in data from a TSQL database, into a
> dataframe, which has a hash of all records appended to it and comparing it
> to a dataframe from yesterdays data (which has been saved also as
> parquet).
>
> As part of the monitoring and logging, I am trying to count the number of
> records for the respective actions.  Example code:
>
> df_source = spark_session.read.format('jdbc').
> df_reference = sql_context.read.parquet('/path/to/reference.parquet')
>
> df_source_hashed = df_source.withColumn('hashkey', md5(concat_ws('', 
> *df_source.columns))) \
> .cache()
>
> df_inserts = df_source_hashed.join(df_reference, pk_list, how='left_anti') \
> .select(lit('Insert').alias('_action'), 
> *df_source_hashed) \
> .dropDuplicates() \
> .cache()
> inserts_count = df_inserts.count()
>
> df_updates = df_source_hashed.alias('a').join(df_reference.alias('b'), 
> pk_list, how="inner") \
> .select(lit('Update').alias('_action'), 
> *df_source_hashed) \
> .where(col('a.hashkey') != col('b.hashkey')) \
> .dropDuplicates() \
> .cache()
> updates_count = df_updates.count()
>
> df_output = df_inserts.union(df_updates)
>
> df_output.repartition(1).write.format('parquet').mode('overwrite').save('/path/to/output.parquet')
>
> The above code is running two occurrences concurrently via Python
> threading.Thread (this is to try and overcome the network bottle neck
> connecting to the database server).
>
> What I am finding is I am getting some very inconsistent behavior with the
> counts.  Occasionally, it appears that it will freeze up on a count
> operation for a few minutes and quite often that specific data frame will
> have zero records in it.  According to the DAG (which I am not 100% sure
> how to read) the following is the processing flow:
>
> Exchange/ShuffledRowRDD [74]count at NativeMethodAccessorImpl.java:0  =>
> WholeStageCodegen/MapPartitionsRDD [75]count at
> NativeMethodAccessorImpl.java:0  => InMemoryTableScan/MapPartitionsRDD
> [78]count at NativeMethodAccessorImpl.java:0 => MapPartitionsRDD [79]count
> at NativeMethodAccessorImpl.java:0 => WholeStageCodegen/MapPartitionsRDD
> [80]count at NativeMethodAccessorImpl.java:0 => Exchange/MapPartitionsRDD
> [81]count at NativeMethodAccessorImpl.java:0
>
> The other observation I have found that if I remove the counts from the
> data frame operations and instead open the outputted parquet field and
> count using a
> `sql_context.read.load('/path/to/output.parquet').filter(col("_action") ==
> "Insert").count()` command, I am reducing my run-times by around 20 to
> 30%.  In my feeble mind, opening up the outputs and re-reading them seems
> counter-intuitive.
>
> Is anyone able to give me some guidance on why or how to ensure that I am
> doing the above as efficiently as possible?
>
> Best Regards
> Ashley
>


unsubscribe

2020-01-03 Thread David Aspegren
unsubscribe


Re: How to use spark-on-k8s pod template?

2019-11-08 Thread David Mitchell
Are you using Spark 2.3 or above?

See the documentation:
https://spark.apache.org/docs/latest/running-on-kubernetes.html

I looks like you do not need:
--conf spark.kubernetes.driver.podTemplateFile='/spark-pod-template.yaml' \
--conf spark.kubernetes.executor.podTemplateFile='/spark-pod-template.yaml'
\

Is your service account and namespace properly setup?

Cluster mode:

$ bin/spark-submit \
--master k8s://https://: \
--deploy-mode cluster \
--name spark-pi \
--class org.apache.spark.examples.SparkPi \
--conf spark.executor.instances=5 \
--conf spark.kubernetes.container.image= \
local:///path/to/examples.jar


On Tue, Nov 5, 2019 at 6:37 AM sora  wrote:

> Hi all,
> I am looking for the usage about the spark-on-k8s pod template.
> I want to set some toleration rules for the driver and executor pod.
> I tried to set --conf 
> spark.kubernetes.driver.podTemplateFile=/spark-pod-template.yaml but
> didn't work.
> The driver pod started without the toleration rules and stay pending
> because of no available node.
> Could anyone please show me any usage?
>
> The template file is below.
>
> apiVersion: extensions/v1beta1
> kind: Pod
> spec:
>   template:
> spec:
>   tolerations:
> - effect: NoSchedule
>   key: project
>   operator: Equal
>   value: name
>
>
> My full command is below.
>
> /opt/spark/bin/spark-submit --master 
> k8s://https://$KUBERNETES_SERVICE_HOST:$KUBERNETES_PORT_443_TCP_PORT \
> --conf spark.kubernetes.driver.podTemplateFile='/spark-pod-template.yaml' \
> --conf spark.kubernetes.executor.podTemplateFile='/spark-pod-template.yaml' \
> --conf spark.scheduler.mode=FAIR \
> --conf spark.driver.memory=2g \
> --conf spark.driver.cores=1 \
> --conf spark.executor.cores=1 \
> --conf spark.executor.memory=1g \
> --conf spark.executor.instances=4 \
> --conf spark.kubernetes.container.image=job-image \
> --conf spark.kubernetes.namespace=nc \
> --conf spark.kubernetes.authenticate.driver.serviceAccountName=sa \
> --conf spark.kubernetes.report.interval=5 \
> --conf spark.kubernetes.submission.waitAppCompletion=false \
> --deploy-mode cluster \
> --name job-name \
> --class job.class job.jar job-args
>
>
>
>
>
>
>
>
>
>
>
>

-- 
### Confidential e-mail, for recipient's (or recipients') eyes only, not
for distribution. ###


Re: how to refresh the loaded non-streaming dataframe for each steaming batch ?

2019-09-06 Thread David Zhou
Not yet. Learning spark

On Fri, Sep 6, 2019 at 2:17 PM Shyam P  wrote:

> cool ,but did you find a way or anyhelp or clue ?
>
> On Fri, Sep 6, 2019 at 11:40 PM David Zhou  wrote:
>
>> I have the same question with yours
>>
>> On Thu, Sep 5, 2019 at 9:18 PM Shyam P  wrote:
>>
>>> Hi,
>>>
>>> I am using spark-sql-2.4.1v to streaming in my PoC.
>>>
>>> how to refresh the loaded dataframe from hdfs/cassandra table every time
>>> new batch of stream processed ? What is the practice followed in general to
>>> handle this kind of scenario?
>>>
>>> Below is the SOF link for more details .
>>>
>>>
>>> https://stackoverflow.com/questions/57815645/how-to-refresh-the-contents-of-non-streaming-dataframe
>>>
>>> Thank you,
>>> Shyam
>>>
>>>
>>


Question on streaming job wait and re-run

2019-09-06 Thread David Zhou
Hi,
My streaming job consumes data from kafka and writes them into Cassandra.

Current status:
Cassandra is not stable. Streaming job crashed when it can't write data
into Cassandra. Streaming job has check point. Usually, the Cassandra
cluster will come back in 4 hours. Finally, I start the streaming job
again, and it will start from check point and run.

Expectation:
Streaming job can enter a status like thread wait, not crash, and it will
be notified automatically when Cassandra comes back.

Does Streaming job support this feature? I searched by Google, can't find
related solutions.

Thanks.
David


Re: how to refresh the loaded non-streaming dataframe for each steaming batch ?

2019-09-06 Thread David Zhou
I have the same question with yours

On Thu, Sep 5, 2019 at 9:18 PM Shyam P  wrote:

> Hi,
>
> I am using spark-sql-2.4.1v to streaming in my PoC.
>
> how to refresh the loaded dataframe from hdfs/cassandra table every time
> new batch of stream processed ? What is the practice followed in general to
> handle this kind of scenario?
>
> Below is the SOF link for more details .
>
>
> https://stackoverflow.com/questions/57815645/how-to-refresh-the-contents-of-non-streaming-dataframe
>
> Thank you,
> Shyam
>
>


Re: Start point to read source codes

2019-09-05 Thread David Zhou
Hi Hichame,
Thanks a lot. I forked it. There are lots of codes. Need documents to guide
me which part I should start from.

On Thu, Sep 5, 2019 at 1:30 PM Hichame El Khalfi 
wrote:

> Hey David,
>
> You can the source code on GitHub:
> https://github.com/apache/spark
>
> Hope this helps,
>
> Hichame
> *From:* zhou10...@gmail.com
> *Sent:* September 5, 2019 4:11 PM
> *To:* user@spark.apache.org
> *Subject:* Start point to read source codes
>
> Hi,
>
> I want to read the source codes. Is there any doc, wiki or book which
> introduces the source codes.
>
> Thanks in advance.
>
> David
>


Spark 2.4.3 on Kubernetes Client mode fails

2019-05-26 Thread David Aspegren
Hi,

I can run successfully in cluster mode but when trying in client mode the
job won't run.

I have a driver pod created from jupyter/pyspark-notebook that I would
assume is sufficient.

I execute:

bin/spark-submit  \
--master k8s://https://192.168.99.100:8443  \
--deploy-mode client  \
--conf spark.executor.instances=1  \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark  \
--conf spark.kubernetes.container.image=spark:spark-docker  \
--conf spark.driver.host=jupyter-pyspark.default.svc.cluster.local \
--conf spark.driver.port=9888 \
--class org.apache.spark.examples.SparkPi  \
--name spark-pi  \
local:///opt/spark/examples/jars/spark-examples_2.11-2.4.3.jar 10

I am then looking at the created executor pod.
All seems well, it can connect back to the driver pod on 9888 but then:

19/05/26 11:35:38 ERROR RetryingBlockFetcher: Exception while beginning
fetch of 1 outstanding blocks
java.io.IOException: Connecting to
jupyter-pyspark.default.svc.cluster.local/10.111.230.250:37615 timed out
(12 ms)
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:243)
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:187)
at
org.apache.spark.network.netty.NettyBlockTransferService$$anon$2.createAndStart(NettyBlockTransferService.scala:114)
at
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:141)
at
org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:121)
at
org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:124)
at
org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:98)
at
org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:757)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:162)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:151)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:151)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.broadcast.TorrentBroadcast.org
$apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:151)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$apply$2.apply(TorrentBroadcast.scala:231)
at scala.Option.getOrElse(Option.scala:121)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:211)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1326)
at
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:207)
at
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
at
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
at
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:84)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)



What does this mean?

Thanks in advance
David


RE: Run SQL on files directly

2018-12-08 Thread David Markovitz
Thanks Subhash
I am familiar with the other APIs but I am curios about this specific one and I 
could not figure it out from the git repository.

Best regards,

David (דודו) Markovitz
Technology Solutions Professional, Data Platform
Microsoft Israel

Mobile: +972-525-834-304
Office: +972-747-119-274

[cid:image002.png@01D166A7.36DE1270]

From: Subhash Sriram 
Sent: Saturday, December 8, 2018 10:38 PM
To: David Markovitz 
Cc: user@spark.apache.org
Subject: Re: Run SQL on files directly

Hi David,

I’m not sure if that is possible, but why not just read the CSV file using the 
Scala API, specifying those options, and then query it using SQL by creating a 
temp view?

Thanks,
Subhash
Sent from my iPhone

On Dec 8, 2018, at 12:39 PM, David Markovitz 
mailto:dudu.markov...@microsoft.com.INVALID>>
 wrote:
Hi
Spark SQL supports direct querying on files 
(here<https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fsql-data-sources-load-save-functions.html%23run-sql-on-files-directly=02%7C01%7CDudu.Markovitz%40microsoft.com%7C2ad116ba1f064540fdbe08d65d4d18c1%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C1%7C63679898304411=Xp9f21QYbUHBZcoGzPN8XXoCfcc%2BJ5gnx5DIzkZ4TEc%3D=0>),
 e.g. –


select * from csv.`/my/path/myfile.csv`

Does anybody know if it possible to pass options (sep, header, encoding etc.) 
with this syntax?

Thanks


Best regards,

David (דודו) Markovitz
Technology Solutions Professional, Data Platform
Microsoft Israel

Mobile: +972-525-834-304
Office: +972-747-119-274





Run SQL on files directly

2018-12-08 Thread David Markovitz
Hi
Spark SQL supports direct querying on files 
(here<https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#run-sql-on-files-directly>),
 e.g. –


select * from csv.`/my/path/myfile.csv`

Does anybody know if it possible to pass options (sep, header, encoding etc.) 
with this syntax?

Thanks


Best regards,

David (דודו) Markovitz
Technology Solutions Professional, Data Platform
Microsoft Israel

Mobile: +972-525-834-304
Office: +972-747-119-274

[cid:image002.png@01D166A7.36DE1270]



Spark event logging with s3a

2018-11-08 Thread David Hesson
We are trying to use spark event logging with s3a as a destination for event 
data.

We added these settings to the spark submits:

spark.eventLog.dir s3a://ourbucket/sparkHistoryServer/eventLogs
spark.eventLog.enabled true

Everything works fine with smaller jobs, and we can see the history data in the 
history server that’s also using s3a. However, when we tried a job with a few 
hundred gigs of data that goes through multiple stages, it was dying with OOM 
exception (same job works fine with spark.eventLog.enabled false)

18/10/22 23:07:22 ERROR util.Utils: uncaught error in thread SparkListenerBus, 
stopping SparkContext
java.lang.OutOfMemoryError
at 
java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
at 
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)

Full stack trace: 
https://gist.github.com/davidhesson/bd64a25f04c6bb241ec398f5383d671c

Does anyone have any insight or experience with using spark history server with 
s3a? Is this problem being caused by perhaps something else in our configs? Any 
help would be appreciated.


Seeing a framework registration loop with Spark 2.3.1 on DCOS 1.10.0

2018-09-04 Thread David Hesson
I’m attempting to use Spark 2.3.1 (spark-2.3.1-bin-hadoop2.7.tgz) in cluster 
mode and running into some issues. This is a cluster where we've had success 
using Spark 2.2.0 (spark-2.2.0-bin-hadoop2.7.tgz), and I'm simply upgrading our 
nodes with the new Spark 2.3.1 package and testing it out.

Some version information:

Spark v2.3.1
DC/OS v1.10.0
Mesos v1.4.0
Dispatcher: docker, mesosphere/spark:2.3.1-2.2.1-2-hadoop-2.6 (Docker image 
from https://github.com/mesosphere/spark-build)

This is a multi-node cluster. I'm submitting a job that's using the sample 
spark-pi jar included in the distribution. Occasionally, spark submits run 
without issue. Then a run will begin execution where a bunch of TASK_LOST 
messages occur immediately, followed by the BlockManager attempting to remove a 
handful of non-existent executors. I also can see where the driver/scheduler 
begins making a tight loop of SUBSCRIBE requests to the master.mesos service. 
The request volume and frequency is so high that the mesos.master stops 
responding to other requests, and eventually runs OOM and systemd restarts the 
failed process. If there is only one job running, and it's able to start an 
executor (exactly one started in my sample logs), the job will eventually 
complete. However, if I deploy multiple jobs (five seemed to do the trick), 
I've seen cases where none of the jobs complete, and the cluster begins to have 
cascading failures due to the master not servicing other API requests due to 
the influx of REGISTER requests from numerous spark driver frameworks.

Logs:
Problematic run (stdout, stderr, mesos.master logs): 
https://gist.github.com/davidhesson/791cb3101db2521a51478ff4e2d22841
Successful run (stdout, stderr; for comparison): 
https://gist.github.com/davidhesson/66e32196834b849cd2919dba8275cd4a
Snippet of flood of subscribes hitting master node: 
https://gist.github.com/davidhesson/2c5d22e4f87fad85ce975bc074289136
Spark submit JSON: 
https://gist.github.com/davidhesson/c0c77dffe48965650fd5bbb078731900


Re: How to add a new source to exsting struct streaming application, like a kafka source

2018-08-01 Thread David Rosenstrauch




On 08/01/2018 12:36 PM, Robb Greathouse wrote:

How to unsubscribe?


List-Unsubscribe: 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Fwd: Array[Double] two time slower then DenseVector

2018-05-09 Thread David Ignjić
Hello all,
I am currently looking in 1 spark application to squeze little performance
and here this code (attached in email)

I looked in difference and in:
org.apache.spark.sql.catalyst.CatalystTypeConverters.ArrayConverter
if its primitive we still use boxing and unboxing version because in code
org.apache.spark.sql.catalyst.util.ArrayData#toArray
we don't use method :  ArrayData .toDoubleArray as its used in VectorUDT.

Now is the question do i need to provide patch or someone can me show it
how to get same performance with array as with dense vector.
Or i need to create jira ticket


Thanks
 import org.apache.spark.ml.linalg.{DenseVector, Vectors}
 import scala.util.Random
 import spark.implicits._
 
 val dotVector = udf {(x:DenseVector,y:DenseVector) => {
var i = 0; var dotProduct = 0.0
val size = x.size;val v1 = x.values; val v2 = y.values
while (i < size) {
  dotProduct += v1(i) * v2(i)
  i += 1
}
dotProduct}}
val dotSeq = udf {(x:Seq[Double],y:Seq[Double]) => {
var i = 0;var dotProduct = 0.0;val size = x.size
while (i < size) {
  dotProduct += x(i) * y(i)
  i += 1
}
dotProduct}}
def time(name: String, block: => Unit): Float = {
val t0 = System.nanoTime()
block // call-by-name
val t1 = System.nanoTime()
//println(s"$name: " + (t1 - t0) / 10f + "s")
((t1 - t0)/ 10f )
 }
val densevector = udf { (p: Seq[Float]) =>  Vectors.dense(p.map(_.toDouble).toArray)   }
val genVec = udf { (l:Int,c:Int) => {
 val r = new Random(l*c)
 (1 to 300).map(p => r.nextDouble()).toArray}
 }

val dfBig = {Seq(1).toDF("s")
.withColumn("line",explode(lit((1 to 1000).toArray)))
.withColumn("column",explode(lit((1 to 200).toArray)))
.withColumn("v1",genVec(col("line").+(lit(22)).*(lit(-1)),col("column")))
.withColumn("v2",genVec(col("line"),col("column")))
.withColumn("v1d",densevector(col("v1")))
.withColumn("v2d",densevector(col("v2")))
.repartition(1)
.persist()}
dfBig.count
dfBig.show(10)

val arrayTime =(1 to 20).map {p=> time("array",dfBig.withColumn("dot",dotSeq(col("v1"),col("v2"))).sort(desc("dot")).limit(10).collect())}.sum /20
val vectorTime = (1 to 20).map {p=> time("array",dfBig.withColumn("dot",dotVector(col("v1d"),col("v2d"))).sort(desc("dot")).limit(10).collect())}.sum / 20
vectorTime/ arrayTime *100
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

spark.python.worker.reuse not working as expected

2018-04-26 Thread David Figueroa
 given this code block

def return_pid(_): yield os.getpid()
spark = SparkSession.builder.getOrCreate()
pids = set(spark.sparkContext.range(32).mapPartitions(return_pid).collect())

print(pids)
pids = set(spark.sparkContext.range(32).mapPartitions(return_pid).collect())

print(pids)

I was expecting that the same python process ids will be printed twice.
instead, completely different Python process ids are being printed.

spark.python.worker.reuse is true but default. but this unexpected
behaviors still occurs if spark.python.worker.reuse=true explicitly.


What's the best way to have Spark a service?

2018-03-15 Thread David Espinosa
Hi all,

I'm quite new to Spark, and I would like to ask whats the best way to have
Spark as a service, and for that I mean being able to include the response
of a scala app/job running in a Spark into a RESTful common request.

Up now I have read about Apache Livy (which I tried and found
incompatibility problems with my scala app), Spark Jobsserver, Finch and I
read that I could use also Spark Streams.

Thanks in advance,
David


Re: how to add columns to row when column has a different encoder?

2018-02-28 Thread David Capwell
Anyone know a way right now to do this? As best as I can tell I need a
custom expression to pass to udf to do this.

Just finished a protobuf encoder and it feels like expression is not meant
to be public (good amount of things are private[sql]), am I wrong about
this? Am I looking at the right interface to add such a UDF?

Thanks for your help!

On Mon, Feb 26, 2018, 3:50 PM David Capwell <dcapw...@gmail.com> wrote:

> I have a row that looks like the following pojo
>
> case class Wrapper(var id: String, var bytes: Array[Byte])
>
> Those bytes are a serialized pojo that looks like this
>
> case class Inner(var stuff: String, var moreStuff: String)
>
> I right now have encoders for both the types, but I don't see how to merge
> the two into a unified row that looks like the following
>
>
> struct>
>
> If I know how to deserialize the bytes and have a encoder, how could I get
> the above schema?  I was looking at ds.withColumn("inner", ???) but wasn't
> sure how to go from pojo + encoder to a column.  Is there a better way to
> do this?
>
> Thanks for your time reading this email
>


Re: SizeEstimator

2018-02-27 Thread David Capwell
Thanks for the reply and sorry for my delayed response, had to go find the
profile data to lookup the class again.


https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala

That class extends SizeEstimator and has a field "map" which buffers the
rows.  In my case the buffer was > 1 million rows so became costly every
time it was checked.


This can be reproduced, create a random data set of (string, long), then
group by string (I believe this is what the code did first, there was a
sort later but should have been a different stage).  Make sure number of
executors is small (for example only one) else you are reducing the size of
M for each executor.

On Mon, Feb 26, 2018, 10:04 PM 叶先进 <advance...@gmail.com> wrote:

> What type is for the buffer you mentioned?
>
>
> On 27 Feb 2018, at 11:46 AM, David Capwell <dcapw...@gmail.com> wrote:
>
> advancedxy <advance...@gmail.com>, I don't remember the code as well
> anymore but what we hit was a very simple schema (string, long). The issue
> is the buffer had a million of these so SizeEstimator of the buffer had to
> keep recalculating the same elements over and over again.  SizeEstimator
> was on-cpu about 30% of the time, bounding the buffer got it to be < 5%
> (going off memory so may be off).
>
> The class info(size of fields lay on heap) is cached for every occurred
> class, so the size info of the same elements would not be recalculated.
> However, for Collection class (or similar) SizeEstimator will scan all the
> elements in the container (`next` field in LinkedList for example).
>
> And the array is a special case: SizeEstimator will sample array if
> array.length > ARRAY_SIZE_FOR_SAMPLING(400).
>
> The cost is really (assuming memory is O(1) which is not true) O(N × M)
> where N is number of rows in buffer and M is size of schema.  My case could
> be solved by not recomputing which would bring the cost to O(M) since
> bookkeeping should be consistent time. There was logic to delay
> recalculating bases off a change in frequency, but that didn't really do
> much for us, bounding and spilling was the bigger win in our case.
>
> On Mon, Feb 26, 2018, 7:24 PM Xin Liu <xin.e@gmail.com> wrote:
>
>> Thanks David. Another solution is to convert the protobuf object to byte
>> array, It does speed up SizeEstimator
>>
>> On Mon, Feb 26, 2018 at 5:34 PM, David Capwell <dcapw...@gmail.com>
>> wrote:
>>
>>> This is used to predict the current cost of memory so spark knows to
>>> flush or not. This is very costly for us so we use a flag marked in the
>>> code as private to lower the cost
>>>
>>> spark.shuffle.spill.numElementsForceSpillThreshold (on phone hope no
>>> typo) - how many records before flush
>>>
>>> This lowers the cost because it let's us leave data in young, if we
>>> don't bound we get everyone promoted to old and GC becomes a issue.  This
>>> doesn't solve the fact that the walk is slow, but lowers the cost of GC.
>>> For us we make sure to have spare memory on the system for page cache so
>>> spilling to disk for us is a memory write 99% of the time.  If your host
>>> has less free memory spilling may become more expensive.
>>>
>>>
>>> If the walk is your bottleneck and not GC then I would recommend JOL and
>>> guessing to better predict memory.
>>>
>>> On Mon, Feb 26, 2018, 4:47 PM Xin Liu <xin.e@gmail.com> wrote:
>>>
>>>> Hi folks,
>>>>
>>>> We have a situation where, shuffled data is protobuf based, and
>>>> SizeEstimator is taking a lot of time.
>>>>
>>>> We have tried to override SizeEstimator to return a constant value,
>>>> which speeds up things a lot.
>>>>
>>>> My questions, what is the side effect of disabling SizeEstimator? Is it
>>>> just spark do memory reallocation, or there is more severe consequences?
>>>>
>>>> Thanks!
>>>>
>>>
>>
>


Re: SizeEstimator

2018-02-26 Thread David Capwell
advancedxy <advance...@gmail.com>, I don't remember the code as well
anymore but what we hit was a very simple schema (string, long). The issue
is the buffer had a million of these so SizeEstimator of the buffer had to
keep recalculating the same elements over and over again.  SizeEstimator
was on-cpu about 30% of the time, bounding the buffer got it to be < 5%
(going off memory so may be off).

The cost is really (assuming memory is O(1) which is not true) O(N × M)
where N is number of rows in buffer and M is size of schema.  My case could
be solved by not recomputing which would bring the cost to O(M) since
bookkeeping should be consistent time. There was logic to delay
recalculating bases off a change in frequency, but that didn't really do
much for us, bounding and spilling was the bigger win in our case.

On Mon, Feb 26, 2018, 7:24 PM Xin Liu <xin.e@gmail.com> wrote:

> Thanks David. Another solution is to convert the protobuf object to byte
> array, It does speed up SizeEstimator
>
> On Mon, Feb 26, 2018 at 5:34 PM, David Capwell <dcapw...@gmail.com> wrote:
>
>> This is used to predict the current cost of memory so spark knows to
>> flush or not. This is very costly for us so we use a flag marked in the
>> code as private to lower the cost
>>
>> spark.shuffle.spill.numElementsForceSpillThreshold (on phone hope no
>> typo) - how many records before flush
>>
>> This lowers the cost because it let's us leave data in young, if we don't
>> bound we get everyone promoted to old and GC becomes a issue.  This doesn't
>> solve the fact that the walk is slow, but lowers the cost of GC. For us we
>> make sure to have spare memory on the system for page cache so spilling to
>> disk for us is a memory write 99% of the time.  If your host has less free
>> memory spilling may become more expensive.
>>
>>
>> If the walk is your bottleneck and not GC then I would recommend JOL and
>> guessing to better predict memory.
>>
>> On Mon, Feb 26, 2018, 4:47 PM Xin Liu <xin.e@gmail.com> wrote:
>>
>>> Hi folks,
>>>
>>> We have a situation where, shuffled data is protobuf based, and
>>> SizeEstimator is taking a lot of time.
>>>
>>> We have tried to override SizeEstimator to return a constant value,
>>> which speeds up things a lot.
>>>
>>> My questions, what is the side effect of disabling SizeEstimator? Is it
>>> just spark do memory reallocation, or there is more severe consequences?
>>>
>>> Thanks!
>>>
>>
>


Re: SizeEstimator

2018-02-26 Thread David Capwell
This is used to predict the current cost of memory so spark knows to flush
or not. This is very costly for us so we use a flag marked in the code as
private to lower the cost

spark.shuffle.spill.numElementsForceSpillThreshold (on phone hope no typo)
- how many records before flush

This lowers the cost because it let's us leave data in young, if we don't
bound we get everyone promoted to old and GC becomes a issue.  This doesn't
solve the fact that the walk is slow, but lowers the cost of GC. For us we
make sure to have spare memory on the system for page cache so spilling to
disk for us is a memory write 99% of the time.  If your host has less free
memory spilling may become more expensive.


If the walk is your bottleneck and not GC then I would recommend JOL and
guessing to better predict memory.

On Mon, Feb 26, 2018, 4:47 PM Xin Liu  wrote:

> Hi folks,
>
> We have a situation where, shuffled data is protobuf based, and
> SizeEstimator is taking a lot of time.
>
> We have tried to override SizeEstimator to return a constant value, which
> speeds up things a lot.
>
> My questions, what is the side effect of disabling SizeEstimator? Is it
> just spark do memory reallocation, or there is more severe consequences?
>
> Thanks!
>


how to add columns to row when column has a different encoder?

2018-02-26 Thread David Capwell
I have a row that looks like the following pojo

case class Wrapper(var id: String, var bytes: Array[Byte])

Those bytes are a serialized pojo that looks like this

case class Inner(var stuff: String, var moreStuff: String)

I right now have encoders for both the types, but I don't see how to merge
the two into a unified row that looks like the following


struct>

If I know how to deserialize the bytes and have a encoder, how could I get
the above schema?  I was looking at ds.withColumn("inner", ???) but wasn't
sure how to go from pojo + encoder to a column.  Is there a better way to
do this?

Thanks for your time reading this email


Re: Encoder with empty bytes deserializes with non-empty bytes

2018-02-21 Thread David Capwell
Ok found my issue

case c if c == classOf[ByteString] =>
  StaticInvoke(classOf[Protobufs], ArrayType(ByteType),
"fromByteString", parent :: Nil)

Should be

case c if c == classOf[ByteString] =>
  StaticInvoke(classOf[Protobufs], BinaryType, "fromByteString", parent :: Nil)



This causes the java code to see a byte[] which uses a different code path
than linked.  Since I did ArrayType(ByteTyep) I had to wrap the data in a
ArrayData class



On Wed, Feb 21, 2018 at 9:55 PM, David Capwell <dcapw...@gmail.com> wrote:

> I am trying to create a Encoder for protobuf data and noticed something
> rather weird.  When we have a empty ByteString (not null, just empty), when
> we deserialize we get back a empty array of length 8.  I took the generated
> code and see something weird going on.
>
> UnsafeRowWriter
>
>
>1.
>
>public void setOffsetAndSize(int ordinal, long currentCursor, long size) {
>
>2.
>
>final long relativeOffset = currentCursor - startingOffset;
>
>3.
>
>final long fieldOffset = getFieldOffset(ordinal);
>
>4.
>
>final long offsetAndSize = (relativeOffset << 32) | size;
>
>5.
>
>6.
>
>Platform.putLong(holder.buffer, fieldOffset, offsetAndSize);
>
>7.
>
>  }
>
>
>
> So this takes the size of the array and stores it... but its not the array
> size, its how many bytes were added
>
> rowWriter2.setOffsetAndSize(2, tmpCursor16, holder.cursor - tmpCursor16);
>
>
>
> So since the data is empty the only method that moves the cursor forward is
>
> arrayWriter1.initialize(holder, numElements1, 8);
>
> which does the following
>
> holder.cursor += (headerInBytes + fixedPartInBytes);
>
> in a debugger I see that headerInBytes = 8 and fixedPartInBytes = 0.
>
> Here is the header write
>
>
>1.
>
>Platform.putLong(holder.buffer, startingOffset, numElements);
>
>2.
>
>for (int i = 8; i < headerInBytes; i += 8) {
>
>3.
>
>  Platform.putLong(holder.buffer, startingOffset + i, 0L);
>
>4.
>
>}
>
>
>
>
> Ok so so far this makes sense, in order to deserialize you need to know
> about the data, so all good. Now to look at the deserialize path
>
>
> UnsafeRow.java
>
> @Override
> public byte[] getBinary(int ordinal) {
>   if (isNullAt(ordinal)) {
> return null;
>   } else {
> final long offsetAndSize = getLong(ordinal);
> final int offset = (int) (offsetAndSize >> 32);
> final int size = (int) offsetAndSize;
> final byte[] bytes = new byte[size];
> Platform.copyMemory(
>   baseObject,
>   baseOffset + offset,
>   bytes,
>   Platform.BYTE_ARRAY_OFFSET,
>   size
> );
> return bytes;
>   }
> }
>
>
>
> Since this doesn't read the header to return the user-bytes, it tries to
> return header + user-data.
>
>
>
> Is this expected? Am I supposed to filter out the header and force a
> mem-copy to filter out for just the user-data? Since header appears to be
> dynamic, how would I know the header length?
>
> Thanks for your time reading this email.
>
>
> Spark version: spark_2.11-2.2.1
>


Encoder with empty bytes deserializes with non-empty bytes

2018-02-21 Thread David Capwell
I am trying to create a Encoder for protobuf data and noticed something
rather weird.  When we have a empty ByteString (not null, just empty), when
we deserialize we get back a empty array of length 8.  I took the generated
code and see something weird going on.

UnsafeRowWriter


   1.

   public void setOffsetAndSize(int ordinal, long currentCursor, long size) {

   2.

   final long relativeOffset = currentCursor - startingOffset;

   3.

   final long fieldOffset = getFieldOffset(ordinal);

   4.

   final long offsetAndSize = (relativeOffset << 32) | size;

   5.

   6.

   Platform.putLong(holder.buffer, fieldOffset, offsetAndSize);

   7.

 }



So this takes the size of the array and stores it... but its not the array
size, its how many bytes were added

rowWriter2.setOffsetAndSize(2, tmpCursor16, holder.cursor - tmpCursor16);



So since the data is empty the only method that moves the cursor forward is

arrayWriter1.initialize(holder, numElements1, 8);

which does the following

holder.cursor += (headerInBytes + fixedPartInBytes);

in a debugger I see that headerInBytes = 8 and fixedPartInBytes = 0.

Here is the header write


   1.

   Platform.putLong(holder.buffer, startingOffset, numElements);

   2.

   for (int i = 8; i < headerInBytes; i += 8) {

   3.

 Platform.putLong(holder.buffer, startingOffset + i, 0L);

   4.

   }




Ok so so far this makes sense, in order to deserialize you need to know
about the data, so all good. Now to look at the deserialize path


UnsafeRow.java

@Override
public byte[] getBinary(int ordinal) {
  if (isNullAt(ordinal)) {
return null;
  } else {
final long offsetAndSize = getLong(ordinal);
final int offset = (int) (offsetAndSize >> 32);
final int size = (int) offsetAndSize;
final byte[] bytes = new byte[size];
Platform.copyMemory(
  baseObject,
  baseOffset + offset,
  bytes,
  Platform.BYTE_ARRAY_OFFSET,
  size
);
return bytes;
  }
}



Since this doesn't read the header to return the user-bytes, it tries to
return header + user-data.



Is this expected? Am I supposed to filter out the header and force a
mem-copy to filter out for just the user-data? Since header appears to be
dynamic, how would I know the header length?

Thanks for your time reading this email.


Spark version: spark_2.11-2.2.1


Re: How to hold some data in memory while processing rows in a DataFrame?

2018-01-23 Thread David Rosenstrauch
That sounds like it might fit the bill.  I'll take a look - thanks!

DR

On Mon, Jan 22, 2018 at 11:26 PM, vermanurag 
wrote:

> Looking at description of problem window functions may solve your issue. It
>  allows operation over a window that can include records before/ after the
> particular record
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: How to hold some data in memory while processing rows in a DataFrame?

2018-01-23 Thread David Rosenstrauch
Thanks, but broadcast variables won't achieve won't I'm looking to do.  I'm
not trying to just share a one-time set of data across the cluster.
Rather, I'm trying to set up a small cache of info that's constantly being
updated based on the records in the dataframe.

DR

On Mon, Jan 22, 2018 at 10:41 PM, naresh Goud <nareshgoud.du...@gmail.com>
wrote:

> If I understand your requirement correct.
> Use broadcast variables to replicate across all nodes the small amount of
> data you wanted to reuse.
>
>
>
> On Mon, Jan 22, 2018 at 9:24 PM David Rosenstrauch <daro...@gmail.com>
> wrote:
>
>> This seems like an easy thing to do, but I've been banging my head
>> against the wall for hours trying to get it to work.
>>
>> I'm processing a spark dataframe (in python).  What I want to do is, as
>> I'm processing it I want to hold some data from one record in some local
>> variables in memory, and then use those values later while I'm processing a
>> subsequent record.  But I can't see any way to do this.
>>
>> I tried using:
>>
>> dataframe.select(a_custom_udf_function('some_column'))
>>
>> ... and then reading/writing to local variables in the udf function, but
>> I can't get this to work properly.
>>
>> My next guess would be to use dataframe.foreach(a_custom_function) and
>> try to save data to local variables in there, but I have a suspicion that
>> may not work either.
>>
>>
>> What's the correct way to do something like this in Spark?  In Hadoop I
>> would just go ahead and declare local variables, and read and write to them
>> in my map function as I like.  (Although with the knowledge that a) the
>> same map function would get repeatedly called for records with many
>> different keys, and b) there would be many different instances of my code
>> spread across many machines, and so each map function running on an
>> instance would only see a subset of the records.)  But in Spark it seems to
>> be extraordinarily difficult to create local variables that can be read
>> from / written to across different records in the dataframe.
>>
>> Perhaps there's something obvious I'm missing here?  If so, any help
>> would be greatly appreciated!
>>
>> Thanks,
>>
>> DR
>>
>>


How to hold some data in memory while processing rows in a DataFrame?

2018-01-22 Thread David Rosenstrauch
 This seems like an easy thing to do, but I've been banging my head against
the wall for hours trying to get it to work.

I'm processing a spark dataframe (in python).  What I want to do is, as I'm
processing it I want to hold some data from one record in some local
variables in memory, and then use those values later while I'm processing a
subsequent record.  But I can't see any way to do this.

I tried using:

dataframe.select(a_custom_udf_function('some_column'))

... and then reading/writing to local variables in the udf function, but I
can't get this to work properly.

My next guess would be to use dataframe.foreach(a_custom_function) and try
to save data to local variables in there, but I have a suspicion that may
not work either.


What's the correct way to do something like this in Spark?  In Hadoop I
would just go ahead and declare local variables, and read and write to them
in my map function as I like.  (Although with the knowledge that a) the
same map function would get repeatedly called for records with many
different keys, and b) there would be many different instances of my code
spread across many machines, and so each map function running on an
instance would only see a subset of the records.)  But in Spark it seems to
be extraordinarily difficult to create local variables that can be read
from / written to across different records in the dataframe.

Perhaps there's something obvious I'm missing here?  If so, any help would
be greatly appreciated!

Thanks,

DR


spark datatypes

2017-12-03 Thread David Hodefi
Looking on the source code, it seems like DateDataType  is Int.


"class DateType private() extends AtomicType {

  // The companion object and this class is separated so the companion
object also subclasses
  // this type. Otherwise, the companion object would be of type
"DateType$" in byte code.
  // Defined with a private constructor so the companion object is the
only possible instantiation.
  *private[sql] type InternalType = Int*


"

I know that Timestamp requires 10 digits that's why we use Long and not Int.

Can someone explain why InternalType is Int?


Thanks David


Re: Spark SQL - Truncate Day / Hour

2017-11-13 Thread David Hodefi
I am familiar with those functions, none of them is actually truncating a
date. We can use those methods to help implement truncate method. I think
truncating a day/ hour should be as simple as "truncate(...,"DD")  or
truncate(...,"HH")  ".

On Thu, Nov 9, 2017 at 8:23 PM, Gaspar Muñoz <gmu...@datiobd.com> wrote:

> There are functions for day (called dayOfMonth and dayOfYear) and hour
> (called hour). You can view them here: https://spark.apache.
> org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions
>
> Example:
>
> import org.apache.spark.sql.functions._
> val df = df.select(hour($"myDateColumn"), dayOfMonth($"myDateColumn"),
> dayOfYear($"myDateColumn"))
>
> 2017-11-09 12:05 GMT+01:00 David Hodefi <davidhodeffi.w...@gmail.com>:
>
>> I would like to truncate date to his day or hour. currently it is only
>> possible to truncate MONTH or YEAR.
>> 1.How can achieve that?
>> 2.Is there any pull request about this issue?
>> 3.If there is not any open pull request about this issue, what are the
>> implications that I should be aware of when coding /contributing it as a
>> pull request?
>>
>> Last question is,  Looking at DateTImeUtils class code, it seems like
>> implementation is not using any open library for handling dates i.e
>> apache-common , Why implementing it instead of reusing open source?
>>
>> Thanks David
>>
>
>
>
> --
> Gaspar Muñoz Soria
>
> Vía de las dos Castillas, 33
> <https://maps.google.com/?q=V%C3%ADa+de+las+dos+Castillas,+33=gmail=g>,
> Ática 4, 3ª Planta
> 28224 Pozuelo de Alarcón, Madrid
> Tel: +34 91 828 6473
>


Spark SQL - Truncate Day / Hour

2017-11-09 Thread David Hodefi
I would like to truncate date to his day or hour. currently it is only
possible to truncate MONTH or YEAR.
1.How can achieve that?
2.Is there any pull request about this issue?
3.If there is not any open pull request about this issue, what are the
implications that I should be aware of when coding /contributing it as a
pull request?

Last question is,  Looking at DateTImeUtils class code, it seems like
implementation is not using any open library for handling dates i.e
apache-common , Why implementing it instead of reusing open source?

Thanks David


Testing spark e-mail list

2017-11-09 Thread David Hodeffi


Confidentiality: This communication and any attachments are intended for the 
above-named persons only and may be confidential and/or legally privileged. Any 
opinions expressed in this communication are not necessarily those of NICE 
Actimize. If this communication has come to you in error you must take no 
action based on it, nor must you copy or show it to anyone; please 
delete/destroy and inform the sender by e-mail immediately.  
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and 
attachments are free from any virus, we advise that in keeping with good 
computing practice the recipient should ensure they are actually virus free.


RE: Hive From Spark: Jdbc VS sparkContext

2017-11-05 Thread David Hodeffi
Testing Spark group e-mail

Confidentiality: This communication and any attachments are intended for the 
above-named persons only and may be confidential and/or legally privileged. Any 
opinions expressed in this communication are not necessarily those of NICE 
Actimize. If this communication has come to you in error you must take no 
action based on it, nor must you copy or show it to anyone; please 
delete/destroy and inform the sender by e-mail immediately.  
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and 
attachments are free from any virus, we advise that in keeping with good 
computing practice the recipient should ensure they are actually virus free.


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



spark sql truncate function

2017-10-30 Thread David Hodeffi
I saw that it is possible to truncate date function with MM or YY but it is not 
possible to truncate  by WEEK ,HOUR, MINUTE.
Am I right? Is there any objection to support it or it is just not implemented 
yet.


Thanks David

Confidentiality: This communication and any attachments are intended for the 
above-named persons only and may be confidential and/or legally privileged. Any 
opinions expressed in this communication are not necessarily those of NICE 
Actimize. If this communication has come to you in error you must take no 
action based on it, nor must you copy or show it to anyone; please 
delete/destroy and inform the sender by e-mail immediately.  
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and 
attachments are free from any virus, we advise that in keeping with good 
computing practice the recipient should ensure they are actually virus free.


Dynamic Accumulators in 2.x?

2017-10-11 Thread David Capwell
I wrote a spark instrumentation tool that instruments RDDs to give more
fine-grain details on what is going on within a Task.  This is working
right now, but uses volatiles and CAS to pass around this state (which
slows down the task).  We want to lower the overhead of this and make the
main call path single-threaded and pass around the result when the task
competes; which sounds like AccumulatorV2.

I started rewriting the instrumented logic to be based off accumulators,
but having a hard time getting these to show up in the UI/API (using this
to see if I am linking things properly).

So my question is as follows.

When running in the executor and we create a accumulator (that was not
created from SparkContext), how would I stitch things properly so it shows
up with accumulators defined from the spark context?  If this is different
for different versions that is fine since we can figure that out quickly
(hopefully) and change the instrumentation.

Approches taken:

Looked at SparkContext.register and copied the same logic, but at runtime

this.hasNextTotal = new LongAccumulator();
this.hasNextTotal.metadata_$eq(new
AccumulatorMetadata(AccumulatorContext.newId(),
createName("hasNextTotal"), false));
AccumulatorContext.register(hasNextTotal);


That didn't end up working

tried getting the context from a SparkContext.getActive, but its not
defined at runtime


Option opt = SparkContext$.MODULE$.getActive();
if (opt.isDefined()) {
SparkContext sc = opt.get();
hasNextTotal.register(sc, Option.apply("hasNext"), false);
nextTotal.register(sc, Option.apply("next"), false);
}


Any help on this would be very helpful! would really rather not
re-implement the wheel if I can piggy-back off Accumulators.

Thanks for your help!

Target spark version: 2.2.0


add jars to spark's runtime

2017-10-11 Thread David Capwell
We want to emit the metrics out of spark into our own custom store.  To do
this we built our own sink and tried to add it to spark by doing --jars
path/to/jar and defining the class in metrics.properties which is supplied
with the job.  We noticed that spark kept crashing with the below exception

17/10/11 09:42:37 ERROR metrics.MetricsSystem: Sink class
com.example.ExternalSink cannot be instantiated
Exception in thread "main" java.lang.reflect.UndeclaredThrowableException
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1707)
at
org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:66)
at
org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:188)
at
org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:284)
at
org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
Caused by: java.lang.ClassNotFoundException: com.example.ExternalSink
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.util.Utils$.classForName(Utils.scala:230)
at
org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:198)
at
org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:194)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
at
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
at
org.apache.spark.metrics.MetricsSystem.registerSinks(MetricsSystem.scala:194)
at
org.apache.spark.metrics.MetricsSystem.start(MetricsSystem.scala:102)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:366)
at org.apache.spark.SparkEnv$.createExecutorEnv(SparkEnv.scala:201)
at
org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$run$1.apply$mcV$sp(CoarseGrainedExecutorBackend.scala:223)
at
org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:67)
at
org.apache.spark.deploy.SparkHadoopUtil$$anon$2.run(SparkHadoopUtil.scala:66)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
... 4 more

We then added this jar into the spark tarball that we use for testing, and
saw that it was able to load just fine and publish.

My question is, how can I add this jar to the spark runtime rather than the
user runtime?  In production we don't have permissions to write to the jars
dir of spark, so that trick to get this working won't work for us.

Thanks for your time reading this email.

Tested on spark 2.2.0


Re: error in running StructuredStreaming-Kafka integration code (Spark 2.x & Kafka 10)

2017-07-10 Thread David Newberger
Karen,

It looks like the Kafka version is incorrect. You mention Kafka 0.10
however the classpath references Kafka 0.9

Thanks,

David

On July 10, 2017 at 1:44:06 PM, karan alang (karan.al...@gmail.com) wrote:

Hi All,

I'm running Spark Streaming - Kafka integration using Spark 2.x & Kafka 10.
& seems to be running into issues.

I compiled the program using sbt, and the compilation went through fine.
I was able able to import this into Eclipse & run the program from Eclipse.

However, when i run the program using spark-submit, i'm getting the
following error :

--

>  $SPARK_HOME/bin/spark-submit --class 
> "structuredStreaming.kafka.StructuredKafkaWordCount1"
> --master local[2] /Users/karanalang/Documents/Te
> chnology/Coursera_spark_scala/structuredStreamingKafka/targe
> t/scala-2.11/StructuredStreamingKafka-assembly-1.0.jar



> *java.lang.ClassNotFoundException:
> structuredStreaming.kafka.StructuredKafkaWordCount1*
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at org.apache.spark.util.Utils$.classForName(Utils.scala:229)
> at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy
> $SparkSubmit$$runMain(SparkSubmit.scala:695)
> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>
--

I've put the jar in the classpath, but i still get the error ->

echo $CLASSPATH

.:/Users/karanalang/Documents/Technology/kafka/kafka_2.11-0.
> 9.0.1/lib/jopt-simple-3.2.jar:/Users/karanalang/Documents/Te
> chnology/kafka/kafka_2.11-0.9.0.1/lib/kafka-clients-0.9.0.1.
> jar:/Users/karanalang/Documents/Technology/kafka/kafka_2.11-
> 0.9.0.1/lib/kafka_2.11-0.9.0.1.jar:/Users/karanalang/
> Documents/Technology/kafka/kafka_2.11-0.9.0.1/lib/log4j-
> 1.2.17.jar:/Users/karanalang/Documents/Technology/kafka/
> kafka_2.11-0.9.0.1/lib/metrics-core-2.2.0.jar:/Users/karanalang/Documents/
> Technology/kafka/kafka_2.11-0.9.0.1/lib/scala-library-2.11.
> 7.jar:/Users/karanalang/Documents/Technology/kafka/
> kafka_2.11-0.9.0.1/lib/slf4j-api-1.7.6.jar:/Users/
> karanalang/Documents/Technology/kafka/kafka_2.11-0.9.0.1/
> lib/slf4j-log4j12-1.7.6.jar:/Users/karanalang/Documents/
> Technology/kafka/kafka_2.11-0.9.0.1/lib/snappy-java-1.1.1.7.
> jar:/Users/karanalang/Documents/Technology/kafka/kafka_2.11-0.9.0.1/lib/
> zkclient-0.7.jar:/Users/karanalang/Documents/Technolog
> y/kafka/kafka_2.11-0.9.0.1/lib/zookeeper-3.4.6.jar:/
> Users/karanalang/Documents/Technology/ApacheSpark-v2.1/spark
> -2.1.0-bin-hadoop2.7/jars/*.jar:/Users/karanalang/Document
> s/Technology/kafka/mirrormaker_topic_rename-master/target/
> mmchangetopic-1.0-SNAPSHOT.jar:/Users/karanalang/Documents/Technology/
> *Coursera_spark_scala/structuredStreamingKafka/target/scala-2.11/*
> *StructuredStreamingKafka-assembly-1.0.jar*


When i look inside the jar - *StructuredStreamingKafka-assembly-1.0.jar, i
don't see the file "*StructuredKafkaWordCount1.class"

Attaching my build.sbt.

Any ideas on what i need to do ?












-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Re: How to perform clean-up after stateful streaming processes an RDD?

2017-06-06 Thread David Rosenstrauch
Thanks much for the suggestion.  Reading over your mail though, I
realize that I may not have made something clear:  I don't just have a
single external service object; the idea is that I have one per
executor, so that the code running on each executor accesses the
external service independently.  (And performs its cleanup independently
as well.)  So I actually don't want stateUpdater.cleanupExternalService
to run on the driver, but rather to run on each executor.

So to be a bit more explicit, using words rather than code, what I'm
trying to do is:

In the driver: 
* Start reading the incoming stream of data (event strings, in my case) 
* Run stateful streaming to ingest the stream of incoming data and
update state objects with it (user summaries, in my case) 
* Output all the completed user summaries (to Kafka, in my case) for
further downstream processing 

In each executor (i.e., RDD partition), using stateful streaming / the
state spec function: 
* Process a batch of incoming event strings 
* Update each event string into the user summary 
* In the process of building the user summary, retrieve some data from
the external service (lazily initialized) 
* Emit each completed user summary as output from the stateful streaming
state spec function 
* After all event strings in the batch have been processed, perform
cleanup on this executor's interaction with the external service by
writing some data back to the service

I'm able to get all of this working except for that last bullet point.

There doesn't seem to be any way to access the external service
connection object being used in my state spec function and then, after
the RDD batch is done, tell it to write back the data it's holding. 
Stateful streaming doesn't seem to give one much to work with in this
regard.  It doesn't notify you in any way that a batch is complete; it
just repeatedly calls your state spec function for each incoming record,
without any indication as to when one batch ended and another one
started.  And the instances of the objects that each executor
deserializes and calls your state spec function on don't seem to be
accessible from ... well, anywhere else other than in the state spec
function itself.

Any ideas how what I'm trying to do might be achievable using stateful
streaming?

Thanks,

DR 

On Tue, Jun 6, 2017 at 7:31 PM, Gerard Maas <gerard.m...@gmail.com>
wrote:

> It looks like the clean up should go into the foreachRDD function: 
> 
> stateUpdateStream.foreachRdd(...) { rdd =>  
> // do stuff with the rdd 
> 
> stateUpdater.cleanupExternalService// should work in this position
> } 
> 
> Code within the foreachRDD(*) executes on the driver, so you can keep the 
> state of the object there. 
> 
> What will not work is to update the stateUpdater state from a side effect of 
> the stateUpdateFunction used in the mapWithState transformation and expect 
> those changes to be visible at the call site sketched above. 
> 
> kr, Gerard. 
> 
> (*) a typical construct found in the wild is: 
> dstream.foreachRDD{rdd => 
> // do some preparation 
> rdd.operation{elem => ... } 
> ... 
> // close/clean/report 
> } 
> So the code within the foreachRDD closure executes on the driver, *but* the 
> code within the rdd.operation{...} closure is a spark operation and executes 
> distributed on the executors. 
> One must be careful of not incorrectly mixing the scopes, in particular when 
> holding on to local state. 
> 
> On Wed, Jun 7, 2017 at 1:08 AM, David Rosenstrauch <daro...@gmail.com> wrote:
> 
>> We have some code we've written using stateful streaming (mapWithState) 
>> which works well for the most part.  The stateful streaming runs, processes 
>> the RDD of input data, calls the state spec function for each input record, 
>> and does all proper adding and removing from the state cache.
>> 
>> However, I have a need to do some cleanup after stateful streaming processes 
>> the input data RDD, and I can't seem to find any place where we can put that 
>> code where it will run when it's supposed to.
>> 
>> Essentially our state spec function needs to a) call out to an external 
>> service, b) hold some data from that service, and then c) inform the service 
>> to clean up the remaining data once the RDD is complete.
>> 
>> I've gotten to the point where the code looks approximately like this:
>> 
>> val eventStream = incomingStream.transform(...)
>> 
>> val stateUpdater = new StateUpdater 
>> val stateUpdateStream = 
>> eventStream.mapWithState(stateUpdater.stateUpdateFunction _)
>> 
>> stateUpdateStream.foreachRdd(...) {
>> ...
>> } 
>> stateUpdater.cleanupExternalService// DOES NOT WORK! 
>> 
>> class StateUpdater extends Serializable {
>&g

Re: How to perform clean-up after stateful streaming processes an RDD?

2017-06-06 Thread David Rosenstrauch
Thanks much for the suggestion.  Reading over your mail though, I realize
that I may not have made something clear:  I don't just have a single
external service object; the idea is that I have one per executor, so that
the code running on each executor accesses the external service
independently.  (And performs its cleanup independently as well.)  So I
actually don't want stateUpdater.cleanupExternalService to run on the
driver, but rather to run on each executor.

So to be a bit more explicit, using words rather than code, what I'm trying
to do is:

In the driver:
* Start reading the incoming stream of data (event strings, in my case)
* Run stateful streaming to ingest the stream of incoming data and update
state objects with it (user summaries, in my case)
* Output all the completed user summaries (to Kafka, in my case) for
further downstream processing

In each executor (i.e., RDD partition), using stateful streaming / the
state spec function:
* Process a batch of incoming event strings
* Update each event string into the user summary
* In the process of building the user summary, retrieve some data from the
external service (lazily initialized)
* Emit each completed user summary as output from the stateful streaming
state spec function
* After all event strings in the batch have been processed, perform cleanup
on this executor's interaction with the external service by writing some
data back to the service

I'm able to get all of this working except for that last bullet point.

There doesn't seem to be any way to access the external service connection
object being used in my state spec function and then, after the RDD batch
is done, tell it to write back the data it's holding.  Stateful streaming
doesn't seem to give one much to work with in this regard.  It doesn't
notify you in any way that a batch is complete; it just repeatedly calls
your state spec function for each incoming record, without any indication
as to when one batch ended and another one started.  And the instances of
the objects that each executor deserializes and calls your state spec
function on don't seem to be accessible from ... well, anywhere else other
than in the state spec function itself.

Any ideas how what I'm trying to do might be achievable using stateful
streaming?

Thanks,

DR


On Tue, Jun 6, 2017 at 7:31 PM, Gerard Maas <gerard.m...@gmail.com> wrote:

> It looks like the clean up should go into the foreachRDD function:
>
> stateUpdateStream.foreachRdd(...) { rdd =>
> // do stuff with the rdd
>
>   stateUpdater.cleanupExternalService// should work in this position
> }
>
> Code within the foreachRDD(*) executes on the driver, so you can keep the
> state of the object there.
>
> What will not work is to update the stateUpdater state from a side effect
> of the stateUpdateFunction used in the mapWithState transformation and
> expect those changes to be visible at the call site sketched above.
>
> kr, Gerard.
>
> (*) a typical construct found in the wild is:
> dstream.foreachRDD{rdd =>
>// do some preparation
>rdd.operation{elem => ... }
>...
>// close/clean/report
> }
> So the code within the foreachRDD closure executes on the driver, *but*
> the code within the rdd.operation{...} closure is a spark operation and
> executes distributed on the executors.
> One must be careful of not incorrectly mixing the scopes, in particular
> when holding on to local state.
>
>
>
> On Wed, Jun 7, 2017 at 1:08 AM, David Rosenstrauch <daro...@gmail.com>
> wrote:
>
>> We have some code we've written using stateful streaming (mapWithState)
>> which works well for the most part.  The stateful streaming runs, processes
>> the RDD of input data, calls the state spec function for each input record,
>> and does all proper adding and removing from the state cache.
>>
>> However, I have a need to do some cleanup after stateful streaming
>> processes the input data RDD, and I can't seem to find any place where we
>> can put that code where it will run when it's supposed to.
>>
>> Essentially our state spec function needs to a) call out to an external
>> service, b) hold some data from that service, and then c) inform the
>> service to clean up the remaining data once the RDD is complete.
>>
>> I've gotten to the point where the code looks approximately like this:
>>
>>
>> val eventStream = incomingStream.transform(...)
>>
>> val stateUpdater = new StateUpdater
>> val stateUpdateStream = 
>> eventStream.mapWithState(stateUpdater.stateUpdateFunction
>> _)
>>
>> stateUpdateStream.foreachRdd(...) {
>> ...
>> }
>> stateUpdater.cleanupExternalService// DOES NOT WORK!
>>
>>
>> class StateUpdater extends Serializable {
>>

How to perform clean-up after stateful streaming processes an RDD?

2017-06-06 Thread David Rosenstrauch
We have some code we've written using stateful streaming (mapWithState)
which works well for the most part.  The stateful streaming runs, processes
the RDD of input data, calls the state spec function for each input record,
and does all proper adding and removing from the state cache.

However, I have a need to do some cleanup after stateful streaming
processes the input data RDD, and I can't seem to find any place where we
can put that code where it will run when it's supposed to.

Essentially our state spec function needs to a) call out to an external
service, b) hold some data from that service, and then c) inform the
service to clean up the remaining data once the RDD is complete.

I've gotten to the point where the code looks approximately like this:


val eventStream = incomingStream.transform(...)

val stateUpdater = new StateUpdater
val stateUpdateStream =
eventStream.mapWithState(stateUpdater.stateUpdateFunction _)

stateUpdateStream.foreachRdd(...) {
...
}
stateUpdater.cleanupExternalService// DOES NOT WORK!


class StateUpdater extends Serializable {

def stateUpdateFunction(key, value, state) {
if (!state.initalized) {
state.initialize(externalService)
}
...
}

def cleanupExternalService {
externalService.cleanup  // writes some data back to the external service
}

@transient lazy val externalService = new ExternalService
}


Note that the ExternalService object is holding onto a small bit of state
that it needs to write back to the external service once we have completed
running the stateUpdateFunction on every record in the RDD.  However this
code doesn't actually work.  Because of the way Spark serializes objects on
the driver and then deserializes them onto the executor, there's no way for
me to get a hold of the ExternalService object that is being used on each
RDD partition and clean up its leftover data.  Those objects seem to be
held internally somewhere in the bowels of stateful streaming (where it
processes an RDD of incoming data and applies it to the state).  And back
in the main code where I'm trying to call the cleanup method, I'm actually
calling it on a totally different object than the one that ran in the RDD
partitions.  And stateful streaming doesn't provide me with any opportunity
to perform any cleanup processing - say by calling some "rddDone" method to
notify me that it just finished doing state processing on an RDD.  It just
calls only the statespec function over and over, once for every record, and
never notifying me that we've ended processing an RDD or started a new one.


Is there any way out of this conundrum?  I've tried to avoid the problem by
moving my interactions with the external service outside of the state spec
function.  But that didn't work:  the interaction with the external service
is really needed inside of the state spec function, and I caused a bug in
our code when I tried to move it.

Any suggestions that I might not have thought of on how to fix this issue?

Thanks,

DR


statefulStreaming checkpointing too often

2017-06-01 Thread David Rosenstrauch
I'm running into a weird issue with a stateful streaming job I'm running.
(Spark 2.1.0 reading from kafka 0-10 input stream.)

>From what I understand from the docs, by default the checkpoint interval
for stateful streaming is 10 * batchInterval.  Since I'm running a batch
interval of 10 seconds, I would expect that checkpointing should only get
done every 100 seconds.  But what I'm seeing is that Spark is not only
checkpointing every 10 seconds, it's checkpointing twice every 10 seconds!

My code approximately looks like follows:

val eventStream = kafkaStream.
transform(
...
).
map(
...
).
transform(
...
)

val stateStream = eventStream.mapWithState(
...
)

stateUpdatesStream.foreachRDD(
...
)


When the app initializes, the checkpointing configuration looks like so:

17/06/01 21:19:05 INFO DirectKafkaInputDStream: Duration for remembering
RDDs set to 20 ms for
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream@4a85a52c
17/06/01 21:19:05 INFO DirectKafkaInputDStream: Slide time = 1 ms
17/06/01 21:19:05 INFO DirectKafkaInputDStream: Storage level = Serialized
1x Replicated
17/06/01 21:19:05 INFO DirectKafkaInputDStream: Checkpoint interval = null
17/06/01 21:19:05 INFO DirectKafkaInputDStream: Remember interval = 20
ms
17/06/01 21:19:05 INFO DirectKafkaInputDStream: Initialized and validated
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream@4a85a52c
17/06/01 21:19:05 INFO TransformedDStream: Slide time = 1 ms
17/06/01 21:19:05 INFO TransformedDStream: Storage level = Serialized 1x
Replicated
17/06/01 21:19:05 INFO TransformedDStream: Checkpoint interval = null
17/06/01 21:19:05 INFO TransformedDStream: Remember interval = 20 ms
17/06/01 21:19:05 INFO TransformedDStream: Initialized and validated
org.apache.spark.streaming.dstream.TransformedDStream@201d4bfb
17/06/01 21:19:05 INFO MappedDStream: Slide time = 1 ms
17/06/01 21:19:05 INFO MappedDStream: Storage level = Serialized 1x
Replicated
17/06/01 21:19:05 INFO MappedDStream: Checkpoint interval = null
17/06/01 21:19:05 INFO MappedDStream: Remember interval = 20 ms
17/06/01 21:19:05 INFO MappedDStream: Initialized and validated
org.apache.spark.streaming.dstream.MappedDStream@1208bde7
17/06/01 21:19:05 INFO TransformedDStream: Slide time = 1 ms
17/06/01 21:19:05 INFO TransformedDStream: Storage level = Serialized 1x
Replicated
17/06/01 21:19:05 INFO TransformedDStream: Checkpoint interval = null
17/06/01 21:19:05 INFO TransformedDStream: Remember interval = 20 ms
17/06/01 21:19:05 INFO TransformedDStream: Initialized and validated
org.apache.spark.streaming.dstream.TransformedDStream@370b0505
17/06/01 21:19:05 INFO InternalMapWithStateDStream: Slide time = 1 ms
17/06/01 21:19:05 INFO InternalMapWithStateDStream: Storage level = Memory
Deserialized 1x Replicated
17/06/01 21:19:05 INFO InternalMapWithStateDStream: Checkpoint interval =
10 ms
17/06/01 21:19:05 INFO InternalMapWithStateDStream: Remember interval =
20 ms
17/06/01 21:19:05 INFO InternalMapWithStateDStream: Initialized and
validated
org.apache.spark.streaming.dstream.InternalMapWithStateDStream@746c7658
17/06/01 21:19:05 INFO MapWithStateDStreamImpl: Slide time = 1 ms
17/06/01 21:19:05 INFO MapWithStateDStreamImpl: Storage level = Serialized
1x Replicated
17/06/01 21:19:05 INFO MapWithStateDStreamImpl: Checkpoint interval = null
17/06/01 21:19:05 INFO MapWithStateDStreamImpl: Remember interval = 1 ms
17/06/01 21:19:05 INFO MapWithStateDStreamImpl: Initialized and validated
org.apache.spark.streaming.dstream.MapWithStateDStreamImpl@75d7326b
17/06/01 21:19:05 INFO ForEachDStream: Slide time = 1 ms
17/06/01 21:19:05 INFO ForEachDStream: Storage level = Serialized 1x
Replicated
17/06/01 21:19:05 INFO ForEachDStream: Checkpoint interval = null
17/06/01 21:19:05 INFO ForEachDStream: Remember interval = 1 ms
17/06/01 21:19:05 INFO ForEachDStream: Initialized and validated
org.apache.spark.streaming.dstream.ForEachDStream@2b3b2628


Note that there's one line that's correctly showing the 100 second
checkpointing interval:

17/06/01 21:19:05 INFO InternalMapWithStateDStream: Checkpoint interval =
10 ms


And yet the app is still performing checkpointing every 10 seconds ...
twice every 10 seconds, in fact!

17/06/01 21:19:10 INFO CheckpointWriter: Submitted checkpoint of time
149635195 ms to writer queue
17/06/01 21:19:10 INFO CheckpointWriter: Saving checkpoint for time
149635195 ms to file
'hdfs://hadoopmaster01:8020/user/dmx/checkpoint/dev/profilebuild/checkpoint-149635195'
17/06/01 21:19:10 INFO CheckpointWriter: Checkpoint for time 149635195
ms saved to file
'hdfs://hadoopmaster01:8020/user/dmx/checkpoint/dev/profilebuild/checkpoint-149635195',
took 8324 bytes and 165 ms
17/06/01 21:19:11 INFO CheckpointWriter: Submitted checkpoint of time
149635195 ms to writer queue
17/06/01 21:19:11 INFO CheckpointWriter: Saving 

Re: Kafka 0.8.x / 0.9.x support in structured streaming

2017-05-15 Thread David Kaczynski
I haven't done Structured Streaming in Spark 2.1 with Kafka 0.9.x, but I
did do stream processing with Spark 2.0.1 and Kafka 0.10.

Here's the official documenation that I used for Spark Streaming with Kafka
0.10:  https://spark.apache.org/docs/2.1.0/streaming-kafka-integration.html.
It looks like you might need to use the 0.8.x connector, but I found that
connector to be inadequate for some use cases, such as using a regex to
subscribe to multiple topics.  Your results may vary, and I wouldn't be
surprised if there was another source of official documentation on the
topic.

On Mon, May 15, 2017 at 5:25 AM Swapnil Chougule 
wrote:

> Hello
>
> I am new to structured streaming. Wanted to learn if there is support for
> Kafka 0.8.x or Kafka 0.9.x in structured streaming ?
> My Kafka source is of version 0.9.x & want get have structured streaming
> solution on top of it.
> I checked documentation for Spark release 2.1.0 but didn't get exact info.
> Any help is most welcome. Thanks in advance.
>
> --Swapnil
>


Re: running spark program on intellij connecting to remote master for cluster

2017-05-10 Thread David Kaczynski
Do you have Spark installed locally on your laptop with IntelliJ?  Are you
using the SparkLauncher class or your local spark-submit script?  A while
back, I was trying to submit a spark job from my local workstation to a
remote cluster using the SparkLauncher class, but I didn't actually have
SPARK_HOME set or the spark-submit script on my local machine yet, so the
submit was failing.  I think the error I was getting was that SPARK_HOME
environment variable was not set, though.

On Wed, May 10, 2017 at 5:51 AM s t  wrote:

> Hello,
>
> I am trying to run spark code from my laptop with intellij. I have cluster
> of 2 nodes and a master. When i start the program from intellij it gets
> error of some missing classes.
>
> I am aware that some jars need to be distributed to the workers but do not
> know if it is possible programatically. spark submit or jupyter notebook
> handles the issue but intellij does not.
>
> can any one give some advices to me ?
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Exactly-once semantics with kakfa CanCommitOffsets.commitAsync?

2017-04-28 Thread David Rosenstrauch
Yes, I saw that sentence too.  But it's rather short and not very
explanatory, and there doesn't seem to be any further info available
anywhere that expands on it.

When I parse out that sentence:

1) "Kafka is not transactional" - i.e., the commits are done
asynchronously, not synchronously.
2) "so your outputs must still be idempotent" - some of your commits may
duplicate/overlap, so you need to be able to handle processing the same
event(s) more than once.

That doesn't quite make sense to me though.  I don't quite understand why
#1 implies #2.  Yes, Kafka isn't transactional - i.e., doesn't process my
commits synchronously.  But it should be processing my commits
*eventually*.  If you look at my output from the previous message, even
though I called commitAsync on 250959 -> 250962 in the first job, Kafka
never actually processed those commits.  That's not an
eventual/asynchronous commit; that's an optional commit.

Is that in fact the semantics here - i.e., calls to commitAsync are not
actually guaranteed to succeed?  If that's the case, the docs could really
be a *lot* clearer about that.

Thanks,

DR

On Fri, Apr 28, 2017 at 11:34 AM, Cody Koeninger <c...@koeninger.org> wrote:

> From that doc:
>
> " However, Kafka is not transactional, so your outputs must still be
> idempotent. "
>
>
>
> On Fri, Apr 28, 2017 at 10:29 AM, David Rosenstrauch <daro...@gmail.com>
> wrote:
> > I'm doing a POC to test recovery with spark streaming from Kafka.  I'm
> using
> > the technique for storing the offsets in Kafka, as described at:
> >
> > https://spark.apache.org/docs/2.1.0/streaming-kafka-0-10-
> integration.html#kafka-itself
> >
> > I.e., grabbing the list of offsets before I start processing a batch of
> > RDD's, and then committing them when I'm done.  The process pretty much
> > works:  when I shut down my streaming process and then start it up
> again, it
> > pretty much picks up where it left off.
> >
> > However, it looks like there's some overlap happening, where a few of the
> > messages are being processed by both the old and the new streaming job
> runs.
> > I.e., see the following log messages:
> >
> > End of old job run:
> > 17/04/27 20:04:40 INFO KafkaRecoveryTester$: Committing rdd offsets:
> > OffsetRange(topic: 'Datalake', partition: 0, range: [250959 ->
> > 250962]);OffsetRange(topic: 'Datalake', partition: 1, range: [15 ->
> > 18]);OffsetRange(topic: 'Datalake', partition: 3, range: [15 ->
> > 18]);OffsetRange(topic: 'Datalake', partition: 2, range: [14 -> 17])
> >
> > Start of new job run:
> > 17/04/27 20:56:50 INFO KafkaRecoveryTester$: Processing rdd with offsets:
> > OffsetRange(topic: 'Datalake', partition: 3, range: [15 ->
> > 100]);OffsetRange(topic: 'Datalake', partition: 1, range: [15 ->
> > 100]);OffsetRange(topic: 'Datalake', partition: 2, range: [14 ->
> > 100]);OffsetRange(topic: 'Datalake', partition: 0, range: [250959 ->
> > 251044])
> >
> >
> > Notice that in partition 0, for example, the 3 messages with offsets
> 250959
> > through 250961 are being processed twice - once by the old job, and once
> by
> > the new.  I would have expected that in the new run, the offset range for
> > partition 0 would have been 250962 -> 251044, which would result in
> > exactly-once semantics.
> >
> > Am I misunderstanding how this should work?  (I.e., exactly-once
> semantics
> > is not possible here?)
> >
> > Thanks,
> >
> > DR
>


Spark user list seems to be rejecting/ignoring my emails from other subscribed address

2017-04-28 Thread David Rosenstrauch
I've been subscribed to the user@spark.apache.org list at another email
address since 2014.  That address receives all emails sent to the list
without a problem.  But for some reason any emails that *I* send to the
list from that address get ignored or rejected.  (Similarly, any emails I
send to user-faq@, user-help@, user-info@ from that address are also
ignored or rejected.)

This issue has been going on for me for several weeks, at least - possibly
several months.

Any idea what the problem might be / how to fix?  I emailed
user-ow...@spark.apache.org about the problem yesterday but haven't heard
anything back.

Thanks,

DR


Exactly-once semantics with kakfa CanCommitOffsets.commitAsync?

2017-04-28 Thread David Rosenstrauch
 I'm doing a POC to test recovery with spark streaming from Kafka.  I'm
using the technique for storing the offsets in Kafka, as described at:

https://spark.apache.org/docs/2.1.0/streaming-kafka-0-10-integration.html#kafka-itself

I.e., grabbing the list of offsets before I start processing a batch of
RDD's, and then committing them when I'm done.  The process pretty much
works:  when I shut down my streaming process and then start it up again,
it pretty much picks up where it left off.

However, it looks like there's some overlap happening, where a few of the
messages are being processed by both the old and the new streaming job
runs.  I.e., see the following log messages:

End of old job run:
17/04/27 20:04:40 INFO KafkaRecoveryTester$: Committing rdd offsets:
OffsetRange(topic: 'Datalake', partition: 0, range: [250959 ->
250962]);OffsetRange(topic: 'Datalake', partition: 1, range: [15 ->
18]);OffsetRange(topic: 'Datalake', partition: 3, range: [15 ->
18]);OffsetRange(topic: 'Datalake', partition: 2, range: [14 -> 17])

Start of new job run:
17/04/27 20:56:50 INFO KafkaRecoveryTester$: Processing rdd with offsets:
OffsetRange(topic: 'Datalake', partition: 3, range: [15 ->
100]);OffsetRange(topic: 'Datalake', partition: 1, range: [15 ->
100]);OffsetRange(topic: 'Datalake', partition: 2, range: [14 ->
100]);OffsetRange(topic: 'Datalake', partition: 0, range: [250959 ->
251044])


Notice that in partition 0, for example, the 3 messages with offsets 250959
through 250961 are being processed twice - once by the old job, and once by
the new.  I would have expected that in the new run, the offset range for
partition 0 would have been 250962 -> 251044, which would result in
exactly-once semantics.

Am I misunderstanding how this should work?  (I.e., exactly-once semantics
is not possible here?)

Thanks,

DR


[MLlib] Multiple estimators for cross validation

2017-03-14 Thread David Leifker
I am hoping to open a discussion around the cross validation in mllib. I
found that I often wanted to evaluate multiple estimators/pipelines (with
different algorithms) or the same estimator with different parameter grids.
The CrossValidator and TrainValidationSplit only allow a single estimator
and parameter grid.

I played around with the idea a bit after looking at jira and other PRs to
see if someone else had already done something. I didn't come across
anything so I put some code together to at least solve my use case. It is
backwards compatible at an api level and has the ability to read the
previous serialized version.

I am considering opening a pull request, however I am interested in what
folks here think. This would be my first contribution.

The general idea is the ability to do this and be able to select the best
model.

// Configure an ML pipeline using nb.
val nb = new NaiveBayes()
val pipeline1 = new Pipeline("p1").setStages(Array(tokenizer,
hashingTF, nb))
val paramGrid1 = new ParamGridBuilder()
  .addGrid(hashingTF.numFeatures, Array(10, 100))
  .build()

// Configure an ML pipeline using lr.
val lr = new LogisticRegression().setMaxIter(10)
val pipeline2 = new Pipeline("p2").setStages(Array(tokenizer,
hashingTF, lr))
val paramGrid2 = new ParamGridBuilder()
  .addGrid(hashingTF.numFeatures, Array(10, 100))
  .build()

// Configure an ML pipeline using nb bernoulli (4 stages)
val binarizer = new Binarizer()
  .setInputCol(hashingTF.getOutputCol)
  .setOutputCol("binary_features")
val nb2 = new NaiveBayes()
  .setModelType("bernoulli")
  .setFeaturesCol(binarizer.getOutputCol)
val pipeline3 = new Pipeline("p3").setStages(Array(tokenizer,
hashingTF, binarizer, nb2))
val paramGrid3 = new ParamGridBuilder()
  .addGrid(hashingTF.numFeatures, Array(10, 100))
  .build()

// cross validate with both pipelines
val cv = new CrossValidator()
  .setEstimators(Array(pipeline1, pipeline2, pipeline3))
  .setEvaluator(new BinaryClassificationEvaluator)
  .setEstimatorsParamMaps(Array(paramGrid1, paramGrid2, paramGrid3))


Order of rows not preserved after cache + count + coalesce

2017-02-13 Thread David Haglund (external)
Hi,

I found something that surprised me, I expected the order of the rows to be 
preserved, so I suspect this might be a bug. The problem is illustrated with 
the Python example below:

In [1]:
df = spark.createDataFrame([(i,) for i in range(3)], ['n'])
df.cache()
df.count()
df.coalesce(2).rdd.glom().collect()
Out[1]:
[[Row(n=1)], [Row(n=0), Row(n=2)]]

Note how n=1 comes before n=0, above.


If I remove the cache line I get the rows in the correct order and the same if 
I use df.rdd.count() instead of df.count(), see examples below:

In [2]:
df = spark.createDataFrame([(i,) for i in range(3)], ['n'])
df.count()
df.coalesce(2).rdd.glom().collect()
Out[2]:
[[Row(n=0)], [Row(n=1), Row(n=2)]]

In [3]:
df = spark.createDataFrame([(i,) for i in range(3)], ['n'])
df.cache()
df.rdd.count()
df.coalesce(2).rdd.glom().collect()
Out[3]:
[[Row(n=0)], [Row(n=1), Row(n=2)]]


I use spark 2.1.0 and pyspark.

Regards,
/David

The information in this email may be confidential and/or legally privileged. It 
has been sent for the sole use of the intended recipient(s). If you are not an 
intended recipient, you are strictly prohibited from reading, disclosing, 
distributing, copying or using this email or any of its contents, in any way 
whatsoever. If you have received this email in error, please contact the sender 
by reply email and destroy all copies of the original message. Please also be 
advised that emails are not a secure form for communication, and may contain 
errors.

Re: Do jobs fail because of other users of a cluster?

2017-01-24 Thread David Frese

Am 24/01/2017 um 02:43 schrieb Matthew Dailey:

In general, Java processes fail with an OutOfMemoryError when your code
and data does not fit into the memory allocated to the runtime.  In
Spark, that memory is controlled through the --executor-memory flag.
If you are running Spark on YARN, then YARN configuration will dictate
the maximum memory that your Spark executors can request.  Here is a
pretty good article about setting memory in Spark on YARN:
http://www.cloudera.com/documentation/enterprise/5-5-x/topics/cdh_ig_running_spark_on_yarn.html

If the OS were to kill your process because the system has run out of
memory, you would see an error printed to standard error that looks like
this:

||Java HotSpot(TM) 64-Bit Server VM warning: INFO:
os::commit_memory(0xe232, 37601280, 0) failed; error='Cannot
allocate memory' (errno=12) #| There is insufficient memory for the Java
Runtime Environment to continue.|



Thanks for your answer. But that does not explain my perception, that 
the chance for OOM seems to raise when there are more jobs running, right?


Is the maximum memory YARN allows a statically defined value (I see 
yarn.scheduler.maximum-allocation-mb and other things), or can the 
actual value change dynamically, depending on the environment?


Anything else that could significantly change the memory requirements 
from run to run (with same program, data and settings)?


--
David


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Do jobs fail because of other users of a cluster?

2017-01-18 Thread David Frese
Hello everybody,

being quite new to Spark, I am struggling a lot with OutOfMemory exceptions
and "GC overhead limit reached" failures of my jobs, submitted from a
spark-shell and "master yarn".

Playing with --num-executors, --executor-memory and --executor-cores I
occasionally get something done. But I'm also not the only one using the
cluster, and it seems to me, that my jobs sometimes fail with the above
errors, because other people have something running, or have a spark-shell
open at that time; or at least it seems that with the same code, data and
settings, the job sometimes completes and sometimes fails.

Is that "expected behaviour"?

What options/tools can be used to make the success/failure of a job
deterministic - there a lot things out there like, 'dynamic allocation',
Hadoop 'fair scheduler'; but very hard for a newbee to evaluate that (resp.
make suggestions to the admins).

If it cannot be made deterministic, how can I reliably distinguish the OOM
failures that are caused by incorrect settings on my side (e.g. because my
data does not fit into memory), and those failures that are caused by
resource consumption/blocking from other jobs?

Thanks for sharing your thoughts and experiences!





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Do-jobs-fail-because-of-other-users-of-a-cluster-tp28318.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark Storage Tab is empty

2016-12-26 Thread David Hodeffi
I have tried the following code but didn't see anything on the storage tab.


val myrdd = sc.parallelilize(1 to 100)
myrdd.setName("my_rdd")
myrdd.cache()
myrdd.collect()

Storage tab is empty, though I can see the stage of collect() .
I am using 1.6.2 ,HDP 2.5 , spark on yarn


Thanks David


Confidentiality: This communication and any attachments are intended for the 
above-named persons only and may be confidential and/or legally privileged. Any 
opinions expressed in this communication are not necessarily those of NICE 
Actimize. If this communication has come to you in error you must take no 
action based on it, nor must you copy or show it to anyone; please 
delete/destroy and inform the sender by e-mail immediately.  
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and 
attachments are free from any virus, we advise that in keeping with good 
computing practice the recipient should ensure they are actually virus free.


Re: SPARK -SQL Understanding BroadcastNestedLoopJoin and number of partitions

2016-12-21 Thread David Hodeffi
Do you know who can I talk to about this code? I am rally curious to know why 
there is a join and why number of partition for join is the sum of both of 
them, I expected to see that number of partitions should be the same as the 
streamed table ,or  worst case multiplied.

Sent from my iPhone

On Dec 21, 2016, at 14:43, David Hodeffi 
<david.hode...@niceactimize.com<mailto:david.hode...@niceactimize.com>> wrote:


I have two dataframes which I am joining. small and big size dataframess. The 
optimizer suggest to use BroadcastNestedLoopJoin.
number of partitions for the big Dataframe is 200 while small Dataframe has 5 
partitions.
The joined dataframe results with 205 partitions (joined.rdd.partitions.size), 
I have tried to understand why is this number and figured out that 
BroadCastNestedLoopJoin is actually a union.
code :
case class BroadcastNestedLoopJoin{
def doExecuteo(): =
{ ... ... sparkContext.union( matchedStreamRows, 
sparkContext.makeRDD(notMatchedBroadcastRows) ) }
}
can someone please explain what exactly the code of doExecute() do? can you 
elaborate about all the null checks and why can we have nulls ? Why do we have 
205 partitions? link to a JIRA with discussion that can explain the code can 
help.


Confidentiality: This communication and any attachments are intended for the 
above-named persons only and may be confidential and/or legally privileged. Any 
opinions expressed in this communication are not necessarily those of NICE 
Actimize. If this communication has come to you in error you must take no 
action based on it, nor must you copy or show it to anyone; please 
delete/destroy and inform the sender by e-mail immediately.  
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and 
attachments are free from any virus, we advise that in keeping with good 
computing practice the recipient should ensure they are actually virus free.


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



SPARK -SQL Understanding BroadcastNestedLoopJoin and number of partitions

2016-12-21 Thread David Hodeffi

I have two dataframes which I am joining. small and big size dataframess. The 
optimizer suggest to use BroadcastNestedLoopJoin.
number of partitions for the big Dataframe is 200 while small Dataframe has 5 
partitions.
The joined dataframe results with 205 partitions (joined.rdd.partitions.size), 
I have tried to understand why is this number and figured out that 
BroadCastNestedLoopJoin is actually a union.
code :
case class BroadcastNestedLoopJoin{
def doExecuteo(): =
{ ... ... sparkContext.union( matchedStreamRows, 
sparkContext.makeRDD(notMatchedBroadcastRows) ) }
}
can someone please explain what exactly the code of doExecute() do? can you 
elaborate about all the null checks and why can we have nulls ? Why do we have 
205 partitions? link to a JIRA with discussion that can explain the code can 
help.


Confidentiality: This communication and any attachments are intended for the 
above-named persons only and may be confidential and/or legally privileged. Any 
opinions expressed in this communication are not necessarily those of NICE 
Actimize. If this communication has come to you in error you must take no 
action based on it, nor must you copy or show it to anyone; please 
delete/destroy and inform the sender by e-mail immediately.  
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and 
attachments are free from any virus, we advise that in keeping with good 
computing practice the recipient should ensure they are actually virus free.


RE: Launching multiple spark jobs within a main spark job.

2016-12-21 Thread David Hodeffi
I am not familiar of any problem with that.
Anyway, If you run spark applicaction you would have multiple jobs, which makes 
sense that it is not a problem.

Thanks David.

From: Naveen [mailto:hadoopst...@gmail.com]
Sent: Wednesday, December 21, 2016 9:18 AM
To: d...@spark.apache.org; user@spark.apache.org
Subject: Launching multiple spark jobs within a main spark job.

Hi Team,

Is it ok to spawn multiple spark jobs within a main spark job, my main spark 
job's driver which was launched on yarn cluster, will do some preprocessing and 
based on it, it needs to launch multilple spark jobs on yarn cluster. Not sure 
if this right pattern.

Please share your thoughts.
Sample code i ve is as below for better understanding..
-

Object Mainsparkjob {

main(...){

val sc=new SparkContext(..)

Fetch from hive..using hivecontext
Fetch from hbase

//spawning multiple Futures..
Val future1=Future{
Val sparkjob= SparkLauncher(...).launch; spark.waitFor
}

Similarly, future2 to futureN.

future1.onComplete{...}
}

}// end of mainsparkjob
--

Confidentiality: This communication and any attachments are intended for the 
above-named persons only and may be confidential and/or legally privileged. Any 
opinions expressed in this communication are not necessarily those of NICE 
Actimize. If this communication has come to you in error you must take no 
action based on it, nor must you copy or show it to anyone; please 
delete/destroy and inform the sender by e-mail immediately.  
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and 
attachments are free from any virus, we advise that in keeping with good 
computing practice the recipient should ensure they are actually virus free.


Re: What benefits do we really get out of colocation?

2016-12-03 Thread David Mitchell
To get a node local read from Spark to Cassandra, one has to use a read
consistency level of LOCAL_ONE.  For some use cases, this is not an
option.  For example, if you need to use a read consistency level
of LOCAL_QUORUM, as many use cases demand, then one is not going to get a
node local read.

Also, to insure a node local read, one has to set spark.locality.wait to
zero.  Whether or not a partition will be streamed to another node or
computed locally is dependent on the spark.locality.wait parameters. This
parameter can be set to 0 to force all partitions to only be computed on
local nodes.

If you do some testing, please post your performance numbers.


Tracking opened files by Spark application

2016-11-25 Thread David Lauzon
Hi there!

Does Apache Spark offers callback or some kind of plugin mechanism that
would allow this?

For a bit more context, I am looking to create a Record-Replay environment
using Apache Spark as the core processing engine. The goals are:

1. Trace the origins of every file generated by Spark: which application
generated them (jar file), its input files and parameters.

2. To be able to re-run a previous experiment easily, and ideally, be able
to change the parameters (eg. input files, etc.).

Any advice about how to tackle these goals ?

-D


newAPIHadoopFile throws a JsonMappingException: Infinite recursion (StackOverflowError) error

2016-11-17 Thread David Robison
I am trying to create a new JavaPairRDD from data in an HDFS file. My code is:

sparkContext = new JavaSparkContext("yarn-client", "SumFramesPerTimeUnit", 
sparkConf);
JavaPairRDD<LongWritable, BytesWritable> inputRDD = 
sparkContext.newAPIHadoopFile(fileFilter, FixedLengthInputFormat.class, 
LongWritable.class, BytesWritable.class, config);

However, when I run the job I get the following error:

com.fasterxml.jackson.databind.JsonMappingException: Infinite recursion 
(StackOverflowError) (through reference chain: 
scala.collection.convert.IterableWrapper[0]->org.apache.spark.rdd.RDDOperationScope["allScopes"]->scala.collection.convert.IterableWrapper[0]->org.apache.spark.rdd.RDDOperationScope["allScopes"]->...)
  at 
com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:680)
  at 
com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:156)
  at 
com.fasterxml.jackson.databind.ser.std.CollectionSerializer.serializeContents(CollectionSerializer.java:132)
  at 
com.fasterxml.jackson.module.scala.ser.IterableSerializer.serializeContents(IterableSerializerModule.scala:30)
  at 
com.fasterxml.jackson.module.scala.ser.IterableSerializer.serializeContents(IterableSerializerModule.scala:16)
  at 
com.fasterxml.jackson.databind.ser.std.AsArraySerializerBase.serialize(AsArraySerializerBase.java:185)
  at 
com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:575)
  at 
com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:666)
  at 
com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:156)

Any thoughts as to what may be going wrong?
David

David R Robison
Senior Systems Engineer
O. +1 512 247 3700
M. +1 757 286 0022
david.robi...@psgglobal.net<mailto:david.robi...@psgglobal.net>
www.psgglobal.net<http://www.psgglobal.net/>

Prometheus Security Group Global, Inc.
3019 Alvin Devane Boulevard
Building 4, Suite 450
Austin, TX 78741




RE: submitting a spark job using yarn-client and getting NoClassDefFoundError: org/apache/spark/Logging

2016-11-16 Thread David Robison
I’ve gotten a little further along. It now submits the job via Yarn, but now 
the jobs exit immediately with the following error:

Exception in thread "main" java.lang.NoClassDefFoundError: 
org/apache/spark/Logging
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at 
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at 
org.apache.spark.deploy.yarn.ExecutorLauncher$.main(ApplicationMaster.scala:646)
at 
org.apache.spark.deploy.yarn.ExecutorLauncher.main(ApplicationMaster.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.Logging
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

I’ve checked and the class does live in the spark assembly. Any thoughts as 
what might be wrong?


Best Regards,

David R Robison
Senior Systems Engineer
[cid:image004.png@01D19182.F24CA3E0]

From: David Robison [mailto:david.robi...@psgglobal.net]
Sent: Wednesday, November 16, 2016 9:04 AM
To: Rohit Verma <rohit.ve...@rokittech.com>
Cc: user@spark.apache.org
Subject: RE: Problem submitting a spark job using yarn-client as master


This sender failed our fraud detection checks and may not be who they appear to 
be. Learn about spoofing<http://aka.ms/LearnAboutSpoofing>

Feedback<http://aka.ms/SafetyTipsFeedback>

Unfortunately, it doesn’t get that far in my code where I have a SparkContext 
from which to set the Hadoop config parameters. Here is my Java code:

SparkConf sparkConf = new SparkConf()
   .setJars(new String[] { 
"file:///opt/wildfly/mapreduce/mysparkjob-5.0.0.jar", })
   .setSparkHome("/usr/hdp/" + getHdpVersion() + "/spark")
   .set("fs.defaultFS", config.get("fs.defaultFS"))
   ;
sparkContext = new JavaSparkContext("yarn-client", "SumFramesPerTimeUnit", 
sparkConf);

The job dies in the constructor of the JavaSparkContext. I have a logging call 
right after creating the SparkContext and it is never executied.
Any idea what I’m doing wrong? David

Best Regards,

David R Robison
Senior Systems Engineer
[cid:image004.png@01D19182.F24CA3E0]

From: Rohit Verma [mailto:rohit.ve...@rokittech.com]
Sent: Tuesday, November 15, 2016 9:27 PM
To: David Robison 
<david.robi...@psgglobal.net<mailto:david.robi...@psgglobal.net>>
Cc: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: Problem submitting a spark job using yarn-client as master

you can set hdfs as defaults,

sparksession.sparkContext().hadoopConfiguration().set("fs.defaultFS", 
“hdfs://master_node:8020”);

Regards
Rohit

On Nov 16, 2016, at 3:15 AM, David Robison 
<david.robi...@psgglobal.net<mailto:david.robi...@psgglobal.net>> wrote:

I am trying to submit a spark job through the yarn-client master setting. The 
job gets created and submitted to the clients but immediately errors out. Here 
is the relevant portion of the log:

15:39:37,385 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Requesting a new application from cluster with 1 NodeManagers
15:39:37,397 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Verifying our application has not requested more than the maximum memory 
capability of the cluster (4608 MB per container)
15:39:37,398 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) Will 
allocate AM container, with 896 MB memory including 384 MB overhead
15:39:37,399 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Setting up container launch context for our AM
15:39:37,403 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Setting up the launch environment for our AM container
15:39:37,427 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Preparing resources for our AM container
15:39:37,845 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Source and destination file systems are the same. Not copying 
file:/opt/wildfly/modules/org/apache/hadoop/client/main/spark-yarn_2.10-1.6.2.jar
15:39:38,050 INFO  [org.apache.spark.deploy.yarn.Cli

RE: Problem submitting a spark job using yarn-client as master

2016-11-16 Thread David Robison
Unfortunately, it doesn’t get that far in my code where I have a SparkContext 
from which to set the Hadoop config parameters. Here is my Java code:

SparkConf sparkConf = new SparkConf()
   .setJars(new String[] { 
"file:///opt/wildfly/mapreduce/mysparkjob-5.0.0.jar", })
   .setSparkHome("/usr/hdp/" + getHdpVersion() + "/spark")
   .set("fs.defaultFS", config.get("fs.defaultFS"))
   ;
sparkContext = new JavaSparkContext("yarn-client", "SumFramesPerTimeUnit", 
sparkConf);

The job dies in the constructor of the JavaSparkContext. I have a logging call 
right after creating the SparkContext and it is never executied.
Any idea what I’m doing wrong? David

Best Regards,

David R Robison
Senior Systems Engineer
[cid:image004.png@01D19182.F24CA3E0]

From: Rohit Verma [mailto:rohit.ve...@rokittech.com]
Sent: Tuesday, November 15, 2016 9:27 PM
To: David Robison <david.robi...@psgglobal.net>
Cc: user@spark.apache.org
Subject: Re: Problem submitting a spark job using yarn-client as master

you can set hdfs as defaults,

sparksession.sparkContext().hadoopConfiguration().set("fs.defaultFS", 
“hdfs://master_node:8020”);

Regards
Rohit

On Nov 16, 2016, at 3:15 AM, David Robison 
<david.robi...@psgglobal.net<mailto:david.robi...@psgglobal.net>> wrote:

I am trying to submit a spark job through the yarn-client master setting. The 
job gets created and submitted to the clients but immediately errors out. Here 
is the relevant portion of the log:

15:39:37,385 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Requesting a new application from cluster with 1 NodeManagers
15:39:37,397 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Verifying our application has not requested more than the maximum memory 
capability of the cluster (4608 MB per container)
15:39:37,398 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) Will 
allocate AM container, with 896 MB memory including 384 MB overhead
15:39:37,399 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Setting up container launch context for our AM
15:39:37,403 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Setting up the launch environment for our AM container
15:39:37,427 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Preparing resources for our AM container
15:39:37,845 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Source and destination file systems are the same. Not copying 
file:/opt/wildfly/modules/org/apache/hadoop/client/main/spark-yarn_2.10-1.6.2.jar
15:39:38,050 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Source and destination file systems are the same. Not copying 
file:/tmp/spark-fa954c4a-a6cd-4675-8610-67ce858b4842/__spark_conf__1435451360463636119.zip
15:39:38,102 INFO  [org.apache.spark.SecurityManager] (default task-1) Changing 
view acls to: wildfly,hdfs
15:39:38,105 INFO  [org.apache.spark.SecurityManager] (default task-1) Changing 
modify acls to: wildfly,hdfs
15:39:38,105 INFO  [org.apache.spark.SecurityManager] (default task-1) 
SecurityManager: authentication disabled; ui acls disabled; users with view 
permissions: Set(wildfly, hdfs); users with modify permissions: Set(wildfly, 
hdfs)
15:39:38,138 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Submitting application 5 to ResourceManager
15:39:38,256 INFO  [org.apache.hadoop.yarn.client.api.impl.YarnClientImpl] 
(default task-1) Submitted application application_1479240217825_0005
15:39:39,269 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Application report for application_1479240217825_0005 (state: ACCEPTED)
15:39:39,279 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1)
 client token: N/A
diagnostics: N/A
ApplicationMaster host: N/A
ApplicationMaster RPC port: -1
queue: default
start time: 1479242378159
final status: UNDEFINED
tracking URL: 
http://vb1.localdomain:8088/proxy/application_1479240217825_0005/
user: hdfs
15:39:40,285 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Application report for application_1479240217825_0005 (state: ACCEPTED)
15:39:41,290 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Application report for application_1479240217825_0005 (state: ACCEPTED)
15:39:42,295 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Application report for application_1479240217825_0005 (state: FAILED)
15:39:42,295 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1)
 client token: N/A
diagnostics: Application application_1479240217825_0005 
failed 2 times due to AM Container for appattempt_1479240217825_0005_02 
exited with  exitCode: -1000

Problem submitting a spark job using yarn-client as master

2016-11-15 Thread David Robison
I am trying to submit a spark job through the yarn-client master setting. The 
job gets created and submitted to the clients but immediately errors out. Here 
is the relevant portion of the log:

15:39:37,385 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Requesting a new application from cluster with 1 NodeManagers
15:39:37,397 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Verifying our application has not requested more than the maximum memory 
capability of the cluster (4608 MB per container)
15:39:37,398 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) Will 
allocate AM container, with 896 MB memory including 384 MB overhead
15:39:37,399 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Setting up container launch context for our AM
15:39:37,403 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Setting up the launch environment for our AM container
15:39:37,427 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Preparing resources for our AM container
15:39:37,845 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Source and destination file systems are the same. Not copying 
file:/opt/wildfly/modules/org/apache/hadoop/client/main/spark-yarn_2.10-1.6.2.jar
15:39:38,050 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Source and destination file systems are the same. Not copying 
file:/tmp/spark-fa954c4a-a6cd-4675-8610-67ce858b4842/__spark_conf__1435451360463636119.zip
15:39:38,102 INFO  [org.apache.spark.SecurityManager] (default task-1) Changing 
view acls to: wildfly,hdfs
15:39:38,105 INFO  [org.apache.spark.SecurityManager] (default task-1) Changing 
modify acls to: wildfly,hdfs
15:39:38,105 INFO  [org.apache.spark.SecurityManager] (default task-1) 
SecurityManager: authentication disabled; ui acls disabled; users with view 
permissions: Set(wildfly, hdfs); users with modify permissions: Set(wildfly, 
hdfs)
15:39:38,138 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Submitting application 5 to ResourceManager
15:39:38,256 INFO  [org.apache.hadoop.yarn.client.api.impl.YarnClientImpl] 
(default task-1) Submitted application application_1479240217825_0005
15:39:39,269 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Application report for application_1479240217825_0005 (state: ACCEPTED)
15:39:39,279 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1)
 client token: N/A
diagnostics: N/A
ApplicationMaster host: N/A
ApplicationMaster RPC port: -1
queue: default
start time: 1479242378159
final status: UNDEFINED
tracking URL: 
http://vb1.localdomain:8088/proxy/application_1479240217825_0005/
user: hdfs
15:39:40,285 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Application report for application_1479240217825_0005 (state: ACCEPTED)
15:39:41,290 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Application report for application_1479240217825_0005 (state: ACCEPTED)
15:39:42,295 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1) 
Application report for application_1479240217825_0005 (state: FAILED)
15:39:42,295 INFO  [org.apache.spark.deploy.yarn.Client] (default task-1)
 client token: N/A
diagnostics: Application application_1479240217825_0005 
failed 2 times due to AM Container for appattempt_1479240217825_0005_02 
exited with  exitCode: -1000
For more detailed output, check application tracking 
page:http://vb1.localdomain:8088/cluster/app/application_1479240217825_0005Then,
 click on links to logs of each attempt.
Diagnostics: File 
file:/tmp/spark-fa954c4a-a6cd-4675-8610-67ce858b4842/__spark_conf__1435451360463636119.zip
 does not exist
java.io.FileNotFoundException: File 
file:/tmp/spark-fa954c4a-a6cd-4675-8610-67ce858b4842/__spark_conf__1435451360463636119.zip
 does not exist
at 
org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:609)
at 
org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:822)
at 
org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:599)
at 
org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:421)


Notice that the file __spark_conf__1435451360463636119.zip is not copied 
because it exists, I believe on the hdfs. However when the client goes to fetch 
it, it is reporting that it does not exist, probably because it is trying to 
get it from "file:/tmp" not the hdfs. Any idea how I can get this to work?
Thanks, David

David R Robison
Senior Systems Engineer
O. +1 512 247 3700
M. +1 757 286 0022
david.robi...@psgglobal.net<mailto:david.robi...@psgglobal.net>
www.ps

creating a javaRDD using newAPIHadoopFile and FixedLengthInputFormat

2016-11-15 Thread David Robison
I am trying to create a Spark javaRDD using the newAPIHadoopFile and the 
FixedLengthInputFormat. Here is my code snippit,

Configuration config = new Configuration();
config.setInt(FixedLengthInputFormat.FIXED_RECORD_LENGTH, JPEG_INDEX_SIZE);
config.set("fs.hdfs.impl", DistributedFileSystem.class.getName());
String fileFilter = config.get("fs.defaultFS") + "/A/B/C/*.idx";
JavaPairRDD<LongWritable, BytesWritable> inputRDD = 
sparkContext.newAPIHadoopFile(fileFilter, FixedLengthInputFormat.class, 
LongWritable.class, BytesWritable.class, config);

At this point I get the following exception:

Error executing mapreduce job: 
com.fasterxml.jackson.databind.JsonMappingException: Infinite recursion 
(StackOverflowError)

Any idea what I am doing wrong? I am new to Spark. David

David R Robison
Senior Systems Engineer
O. +1 512 247 3700
M. +1 757 286 0022
david.robi...@psgglobal.net<mailto:david.robi...@psgglobal.net>
www.psgglobal.net<http://www.psgglobal.net/>

Prometheus Security Group Global, Inc.
3019 Alvin Devane Boulevard
Building 4, Suite 450
Austin, TX 78741




Re: Efficient filtering on Spark SQL dataframes with ordered keys

2016-11-02 Thread Michael David Pedersen
Awesome, thank you Michael for the detailed example!

I'll look into whether I can use this approach for my use case. If so, I
could avoid the overhead of repeatedly registering a temp table for one-off
queries, instead registering the table once and relying on the injected
strategy. Don't know how much of an impact this overhead has in praxis
though.

Cheers,
Michael


Re: Efficient filtering on Spark SQL dataframes with ordered keys

2016-11-01 Thread Michael David Pedersen
Thanks for the link, I hadn't come across this.

According to https://forums.databricks.com/questions/400/what-is-the-
> difference-between-registertemptable-a.html
>
> and I quote
>
> "registerTempTable()
>
> registerTempTable() creates an in-memory table that is scoped to the
> cluster in which it was created. The data is stored using Hive's
> highly-optimized, in-memory columnar format."
>
But then the last post in the thread corrects this, saying:
"registerTempTable does not create a 'cached' in-memory table, but rather
an alias or a reference to the DataFrame. It's akin to a pointer in C/C++
or a reference in Java".

So - probably need to dig into the sources to get more clarity on this.

Cheers,
Michael


Re: Efficient filtering on Spark SQL dataframes with ordered keys

2016-11-01 Thread Michael David Pedersen
Hi again Mich,

"But the thing is that I don't explicitly cache the tempTables ..".
>
> I believe tempTable is created in-memory and is already cached
>

That surprises me since there is a sqlContext.cacheTable method to
explicitly cache a table in memory. Or am I missing something? This could
explain why I'm seeing somewhat worse performance than I'd expect.

Cheers,
Michael


Re: Efficient filtering on Spark SQL dataframes with ordered keys

2016-10-31 Thread Michael David Pedersen
Hi Mich,

Thank you again for your reply.

As I see you are caching the table already sorted
>
> val keyValRDDSorted = keyValRDD.sortByKey().cache
>
> and the next stage is you are creating multiple tempTables (different
> ranges) that cache a subset of rows already cached in RDD. The data stored
> in tempTable is in Hive columnar format (I assume that means ORC format)
>

But the thing is that I don't explicitly cache the tempTables, and I don't
really want to because I'll only run a single query on each tempTable. So I
expect the SQL query processor to operate directly on the underlying
key-value RDD, and my concern is that this may be inefficient.


> Well that is all you can do.
>

Ok, thanks - that's really what I wanted to get confirmation of.


> Bear in mind that these tempTables are immutable and I do not know any way
> of dropping tempTable to free more memory.
>

I'm assuming there won't be any (significant) memory overhead of
registering the temp tables as long as I don't explicitly cache them. Am I
wrong? In any case I'll be calling sqlContext.dropTempTable once the query
has completed, which according to the documentation should also free up
memory.

Cheers,
Michael


Re: Efficient filtering on Spark SQL dataframes with ordered keys

2016-10-31 Thread Michael David Pedersen
Hi Mich,

Thank you for your quick reply!

What type of table is the underlying table? Is it Hbase, Hive ORC or what?
>

It is a custom datasource, but ultimately backed by HBase.


> By Key you mean a UNIQUE ID or something similar and then you do multiple
> scans on the tempTable which stores data using in-memory columnar format.
>

The key is a unique ID, yes. But note that I don't actually do multiple
scans on the same temp table: I create a new temp table for every query I
want to run, because each query will be based on a different key range. The
caching is at the level of the full key-value RDD.

If I did instead cache the temp table, I don't see a way of exploiting key
ordering for key range filters?


> That is the optimisation of tempTable storage as far as I know.
>

So it seems to me that my current solution won't be using this
optimisation, as I'm caching the RDD rather than the temp table.


> Have you tried it using predicate push-down on the underlying table itself?
>

No, because I essentially want to load the entire table into memory before
doing any queries. At that point I have nothing to push down.

Cheers,
Michael


Efficient filtering on Spark SQL dataframes with ordered keys

2016-10-31 Thread Michael David Pedersen
Hello,

I've got a Spark SQL dataframe containing a "key" column. The queries I
want to run start by filtering on the key range. My question in outline: is
it possible to sort the dataset by key so as to do efficient key range
filters, before subsequently running a more complex SQL query?

I'm aware that such efficient filtering is possible for key-value RDDs,
i.e. RDDs over Tuple2, using PairRDDFunctions. My workflow currently looks
as follows:

// Create a dataframe
val df: DataFrame = sqlContext.sql("SELECT * FROM ...")
val keyValRDD = df.rdd.map( (r: Row) => (r.getAs[String]("key"), r) )

// Sort by key - and cache.
val keyValRDDSorted = keyValRDD.sortByKey().cache

// Define a function to run SQL query on a range.
def queryRange(lower: String, upper: String, sql: String, tableName:
String) = {
val rangeRDD = keyValRDDSorted.filterByRange(lower, upper)
val rangeDF = sqlContext.createDataFrame(rangeRDD.map{ _._2 },
df.schema)
rangeDF.createTempView(tableName)
sqlContext.sql(sql)
}

// Invoke multiple times.
queryRange(...)
queryRange(...)
...

This works, and is efficient in that only the partitions containing the
relevant key range are processed. However, I understand that Spark SQL uses
an optimised storage format as compared to plain RDDs. The above workflow
can't take advantage of this, as it is the key-value RDD that is cached.

So, my specific question: Is there a more efficient way of achieving the
desired result?

Any pointers would be much appreciated.

Many thanks,
Michael

PS: This question was also asked on StackOverflow -
http://stackoverflow.com/questions/40129411/efficient-filtering-on-spark-sql-dataframes-with-ordered-keys
.


Transforming Spark SQL AST with extraOptimizations

2016-10-25 Thread Michael David Pedersen
Hi,

I'm wanting to take a SQL string as a user input, then transform it before
execution. In particular, I want to modify the top-level projection (select
clause), injecting additional columns to be retrieved by the query.

I was hoping to achieve this by hooking into Catalyst using
sparkSession.experimental.extraOptimizations. I know that what I'm
attempting isn't strictly speaking an optimisation (the transformation
changes the semantics of the SQL statement), but the API still seems
suitable. However, my transformation seems to be ignored by the query
executor.

Here is a minimal example to illustrate the issue I'm having. First define
a row case class:

case class TestRow(a: Int, b: Int, c: Int)

Then define an optimisation rule which simply discards any projection:

object RemoveProjectOptimisationRule extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
case x: Project => x.child
}
}

Now create a dataset, register the optimisation, and run a SQL query:

// Create a dataset and register table.
val dataset = List(TestRow(1, 2, 3)).toDS()
val tableName: String = "testtable"
dataset.createOrReplaceTempView(tableName)

// Register "optimisation".
sparkSession.experimental.extraOptimizations =
Seq(RemoveProjectOptimisationRule)

// Run query.
val projected = sqlContext.sql("SELECT a FROM " + tableName + " WHERE a =
1")

// Print query result and the queryExecution object.
println("Query result:")
projected.collect.foreach(println)
println(projected.queryExecution)

Here is the output:

Query result:
[1]

== Parsed Logical Plan ==
'Project ['a]
+- 'Filter ('a = 1)
   +- 'UnresolvedRelation `testtable`

== Analyzed Logical Plan ==
a: int
Project [a#3]
+- Filter (a#3 = 1)
   +- SubqueryAlias testtable
  +- LocalRelation [a#3, b#4, c#5]

== Optimized Logical Plan ==
Filter (a#3 = 1)
+- LocalRelation [a#3, b#4, c#5]

== Physical Plan ==
*Filter (a#3 = 1)
+- LocalTableScan [a#3, b#4, c#5]

We see that the result is identical to that of the original SQL statement,
without the transformation applied. Yet, when printing the logical and
physical plans, the projection has indeed been removed. I've also confirmed
(through debug log output) that the transformation is indeed being invoked.

Any suggestions as to what's going on here? Maybe the optimiser simply
ignores "optimisations" that change semantics?

If using the optimisations isn't the way to go, can anybody suggest an
alternative? All I really want to do is parse the input SQL statement,
transform it, and pass the transformed AST to Spark for execution. But as
far as I can see, the APIs for doing this are private to the Spark sql
package. It may be possible to use reflection, but I'd like to avoid that.

Any pointers would be much appreciated.

Cheers,
Michael

PS: I've previously posted this on StackOverflow, here:
http://stackoverflow.com/questions/40235566/transforming-spark-sql-ast-with-extraoptimizations
.


Need Advice: Spark-Streaming Setup

2016-08-01 Thread David Kaufman
Hi,

I'm currently working on Spark, HBase-Setup which processes log files (~10
GB/day). These log files are persisted hourly on n > 10 application servers
and copied to a 4 node hdfs.

Our current spark-job aggregates single visits (based on a session-uuid)
across all application-servers on a daily basis. Visits are filtered (only
about 1% of data remains) and stored in an HBase for further processing.

Currently there is no use of the Spark-Streaming API, i.e. a cronjob runs
every day and fires the visit calculation.

Questions
1) Ist it really necessary to store the log files in the HDFS or can spark
somehow read the files from a local file system and distribute the data to
the other nodes? Rationale: The data is (probably) only read once during
the visit calculation which defies the purpose of a dfs.

2) If the raw log files have to be in the HDFS, I have to remove the files
from the HDFS after processing them, so COPY -> PROCESS -> REMOVE. Is this
the way to go?

3) Before I can process a visit for an hour. I have to wait until all log
files of all application servers have been copied to the HDFS. It doesn't
seem like StreamingContext.fileStream can wait for more sophisticated
patterns, e.g. ("context*/logs-2016-08-01-15"). Do you guys have a
recommendation to solve this problem? One possible solution: After the
files have been copied, create an additional file that indicates spark that
all files are available?

If you have any questions, please don't hesitate to ask.

Thanks,
David


RE: HBase-Spark Module

2016-07-29 Thread David Newberger
Hi Ben,

This seems more like a question for community.cloudera.com. However, it would 
be in hbase not spark I believe. 

https://repository.cloudera.com/artifactory/webapp/#/artifacts/browse/tree/General/cloudera-release-repo/org/apache/hbase/hbase-spark

David Newberger


-Original Message-
From: Benjamin Kim [mailto:bbuil...@gmail.com] 
Sent: Friday, July 29, 2016 12:57 PM
To: user@spark.apache.org
Subject: HBase-Spark Module

I would like to know if anyone has tried using the hbase-spark module? I tried 
to follow the examples in conjunction with CDH 5.8.0. I cannot find the 
HBaseTableCatalog class in the module or in any of the Spark jars. Can someone 
help?

Thanks,
Ben
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



RE: difference between dataframe and dataframwrite

2016-06-16 Thread David Newberger
DataFrame is a collection of data which is organized into named columns.
DataFrame.write is an interface for saving the contents of a DataFrame to 
external storage.

Hope this helps

David Newberger


From: pseudo oduesp [mailto:pseudo20...@gmail.com]
Sent: Thursday, June 16, 2016 9:43 AM
To: user@spark.apache.org
Subject: difference between dataframe and dataframwrite

hi,

what is difference between dataframe and dataframwrite ?



RE: streaming example has error

2016-06-16 Thread David Newberger
Try adding wordCounts.print() before ssc.start()


David Newberger

From: Lee Ho Yeung [mailto:jobmatt...@gmail.com]
Sent: Wednesday, June 15, 2016 9:16 PM
To: David Newberger
Cc: user@spark.apache.org
Subject: Re: streaming example has error

got another error StreamingContext: Error starting the context, marking it as 
stopped

/home/martin/Downloads/spark-1.6.1/bin/spark-shell --packages 
com.databricks:spark-csv_2.11:1.4.0
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
val conf = new 
SparkConf().setMaster("local[2]").setAppName("NetworkWordCount").set("spark.driver.allowMultipleContexts",
 "true")
val ssc = new StreamingContext(conf, Seconds(1))
val lines = ssc.socketTextStream("localhost", )
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
ssc.start()
ssc.awaitTermination()



scala> val pairs = words.map(word => (word, 1))
pairs: org.apache.spark.streaming.dstream.DStream[(String, Int)] = 
org.apache.spark.streaming.dstream.MappedDStream@61a5e7<mailto:org.apache.spark.streaming.dstream.MappedDStream@61a5e7>

scala> val wordCounts = pairs.reduceByKey(_ + _)
wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Int)] = 
org.apache.spark.streaming.dstream.ShuffledDStream@a522f1<mailto:org.apache.spark.streaming.dstream.ShuffledDStream@a522f1>

scala> ssc.start()
16/06/15 19:14:10 ERROR StreamingContext: Error starting the context, marking 
it as stopped
java.lang.IllegalArgumentException: requirement failed: No output operations 
registered, so nothing to execute
at scala.Predef$.require(Predef.scala:233)
at 
org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:161)
at 
org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:542)
at 
org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:601)
at 
org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600)
at 
$line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:39)
at 
$line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:44)
at 
$line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:46)
at 
$line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:48)
at 
$line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:50)
at 
$line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:52)
at 
$line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:54)
at 
$line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:56)
at $line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:58)
at $line42.$read$$iwC$$iwC$$iwC$$iwC$$iwC.(:60)
at $line42.$read$$iwC$$iwC$$iwC$$iwC.(:62)
at $line42.$read$$iwC$$iwC$$iwC.(:64)
at $line42.$read$$iwC$$iwC.(:66)
at $line42.$read$$iwC.(:68)
at $line42.$read.(:70)
at $line42.$read$.(:74)
at $line42.$read$.()
at $line42.$eval$.(:7)
at $line42.$eval$.()
at $line42.$eval.$print()
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at 
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
at 
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at 
org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
at 
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
at 
org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
at 
org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
at 
org.apache.spark.repl.SparkILoop.org<http://org.apache.spark.repl.SparkILoop.org>$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
at 
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
at 
org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$proces

RE: Limit pyspark.daemon threads

2016-06-15 Thread David Newberger
Have you tried setting spark.cores.max

“When running on a standalone deploy 
cluster<http://spark.apache.org/docs/latest/spark-standalone.html> or a Mesos 
cluster in "coarse-grained" sharing 
mode<http://spark.apache.org/docs/latest/running-on-mesos.html#mesos-run-modes>,
 the maximum amount of CPU cores to request for the application from across the 
cluster (not from each machine). If not set, the default will 
bespark.deploy.defaultCores on Spark's standalone cluster manager, or infinite 
(all available cores) on Mesos.”

David Newberger

From: agateaaa [mailto:agate...@gmail.com]
Sent: Wednesday, June 15, 2016 4:39 PM
To: Gene Pang
Cc: Sven Krasser; Carlile, Ken; user
Subject: Re: Limit pyspark.daemon threads

Thx Gene! But my concern is with CPU usage not memory. I want to see if there 
is anyway to control the number of pyspark.daemon processes that get spawned. 
We have some restriction on number of CPU's we can use on a node, and number of 
pyspark.daemon processes that get created dont seem to honor 
spark.executor.cores property setting
Thanks!

On Wed, Jun 15, 2016 at 1:53 PM, Gene Pang 
<gene.p...@gmail.com<mailto:gene.p...@gmail.com>> wrote:
As Sven mentioned, you can use Alluxio to store RDDs in off-heap memory, and 
you can then share that RDD across different jobs. If you would like to run 
Spark on Alluxio, this documentation can help: 
http://www.alluxio.org/documentation/master/en/Running-Spark-on-Alluxio.html

Thanks,
Gene

On Tue, Jun 14, 2016 at 12:44 AM, agateaaa 
<agate...@gmail.com<mailto:agate...@gmail.com>> wrote:
Hi,
I am seeing this issue too with pyspark (Using Spark 1.6.1).  I have set 
spark.executor.cores to 1, but I see that whenever streaming batch starts 
processing data, see python -m pyspark.daemon processes increase gradually to 
about 5, (increasing CPU% on a box about 4-5 times, each pyspark.daemon takes 
up around 100 % CPU)
After the processing is done 4 pyspark.daemon processes go away and we are left 
with one till the next batch run. Also sometimes the  CPU usage for executor 
process spikes to about 800% even though spark.executor.core is set to 1
e.g. top output
PID USER  PR   NI  VIRT  RES  SHR S   %CPU %MEMTIME+  COMMAND
19634 spark 20   0 8871420 1.790g  32056 S 814.1  2.9   0:39.33 /usr/lib/j+ 
<--EXECUTOR

13897 spark 20   0   46576  17916   6720 S   100.0  0.0   0:00.17 python -m 
+ <--pyspark.daemon
13991 spark 20   0   46524  15572   4124 S   98.0  0.0   0:08.18 python -m 
+ <--pyspark.daemon
14488 spark 20   0   46524  15636   4188 S   98.0  0.0   0:07.25 python -m 
+ <--pyspark.daemon
14514 spark 20   0   46524  15636   4188 S   94.0  0.0   0:06.72 python -m 
+ <--pyspark.daemon
14526 spark 20   0   48200  17172   4092 S   0.0  0.0   0:00.38 python -m + 
<--pyspark.daemon


Is there any way to control the number of pyspark.daemon processes that get 
spawned ?
Thank you
Agateaaa

On Sun, Mar 27, 2016 at 1:08 AM, Sven Krasser 
<kras...@gmail.com<mailto:kras...@gmail.com>> wrote:
Hey Ken,

1. You're correct, cached RDDs live on the JVM heap. (There's an off-heap 
storage option using Alluxio, formerly Tachyon, with which I have no experience 
however.)

2. The worker memory setting is not a hard maximum unfortunately. What happens 
is that during aggregation the Python daemon will check its process size. If 
the size is larger than this setting, it will start spilling to disk. I've seen 
many occasions where my daemons grew larger. Also, you're relying on Python's 
memory management to free up space again once objects are evicted. In practice, 
leave this setting reasonably small but make sure there's enough free memory on 
the machine so you don't run into OOM conditions. If the lower memory setting 
causes strains for your users, make sure they increase the parallelism of their 
jobs (smaller partitions meaning less data is processed at a time).

3. I believe that is the behavior you can expect when setting 
spark.executor.cores. I've not experimented much with it and haven't looked at 
that part of the code, but what you describe also reflects my understanding. 
Please share your findings here, I'm sure those will be very helpful to others, 
too.

One more suggestion for your users is to move to the Pyspark DataFrame API. 
Much of the processing will then happen in the JVM, and you will bump into 
fewer Python resource contention issues.

Best,
-Sven


On Sat, Mar 26, 2016 at 1:38 PM, Carlile, Ken 
<carli...@janelia.hhmi.org<mailto:carli...@janelia.hhmi.org>> wrote:
This is extremely helpful!

I’ll have to talk to my users about how the python memory limit should be 
adjusted and what their expectations are. I’m fairly certain we bumped it up in 
the dark past when jobs were failing because of insufficient memory for the 
python processes.

So just to make sure I’m understanding correctly:


  *   JVM memory (set by SPARK_EX

RE: Handle empty kafka in Spark Streaming

2016-06-15 Thread David Newberger
Hi Yogesh,

I'm not sure if this is possible or not. I'd be interested in knowing. My gut 
thinks it would be an anti-pattern if it's possible to do something like this 
and that's why I handle it in either the foreachRDD or foreachPartition. The 
way I look at spark streaming is as an application which is always running and 
doing something like windowed batching or microbatching or whatever I'm trying 
to accomplish. IF an RDD I get from Kafka is empty then I don't run the rest of 
the job.  IF the RDD I'm get from Kafka has some number of events then I'll 
process the RDD further. 

David Newberger

-Original Message-
From: Yogesh Vyas [mailto:informy...@gmail.com] 
Sent: Wednesday, June 15, 2016 8:30 AM
To: David Newberger
Subject: Re: Handle empty kafka in Spark Streaming

I am looking for something which checks the JavaPairReceiverInputDStreambefore 
further going for any operations.
For example, if I have get JavaPairReceiverInputDStream in following
manner:

JavaPairReceiverInputDStream<String, String> 
message=KafkaUtils.createStream(ssc, zkQuorum, group, topics, 
StorageLevel.MEMORY_AND_DISK_SER());

Then I would like check whether message is empty or not. If it not empty then 
go for further operations else wait for some data in Kafka.

On Wed, Jun 15, 2016 at 6:31 PM, David Newberger <david.newber...@wandcorp.com> 
wrote:
> If you're asking how to handle no messages in a batch window then I would add 
> an isEmpty check like:
>
> dStream.foreachRDD(rdd => {
> if (!rdd.isEmpty())
> ...
> }
>
> Or something like that.
>
>
> David Newberger
>
> -Original Message-
> From: Yogesh Vyas [mailto:informy...@gmail.com]
> Sent: Wednesday, June 15, 2016 6:31 AM
> To: user
> Subject: Handle empty kafka in Spark Streaming
>
> Hi,
>
> Does anyone knows how to handle empty Kafka while Spark Streaming job is 
> running ?
>
> Regards,
> Yogesh
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For 
> additional commands, e-mail: user-h...@spark.apache.org
>


RE: Handle empty kafka in Spark Streaming

2016-06-15 Thread David Newberger
If you're asking how to handle no messages in a batch window then I would add 
an isEmpty check like:

dStream.foreachRDD(rdd => {
if (!rdd.isEmpty()) 
...
}

Or something like that. 


David Newberger

-Original Message-
From: Yogesh Vyas [mailto:informy...@gmail.com] 
Sent: Wednesday, June 15, 2016 6:31 AM
To: user
Subject: Handle empty kafka in Spark Streaming

Hi,

Does anyone knows how to handle empty Kafka while Spark Streaming job is 
running ?

Regards,
Yogesh

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org



RE: streaming example has error

2016-06-15 Thread David Newberger
Have you tried to “set spark.driver.allowMultipleContexts = true”?

David Newberger

From: Lee Ho Yeung [mailto:jobmatt...@gmail.com]
Sent: Tuesday, June 14, 2016 8:34 PM
To: user@spark.apache.org
Subject: streaming example has error

when simulate streaming with nc -lk 
got error below,
then i try example,

martin@ubuntu:~/Downloads$ /home/martin/Downloads/spark-1.6.1/bin/run-example 
streaming.NetworkWordCount localhost 
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/06/14 18:33:06 INFO StreamingExamples: Setting log level to [WARN] for 
streaming example. To override add a custom log4j.properties to the classpath.
16/06/14 18:33:06 WARN NativeCodeLoader: Unable to load native-hadoop library 
for your platform... using builtin-java classes where applicable
16/06/14 18:33:06 WARN Utils: Your hostname, ubuntu resolves to a loopback 
address: 127.0.1.1; using 192.168.157.134 instead (on interface eth0)
16/06/14 18:33:06 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another 
address
16/06/14 18:33:13 WARN SizeEstimator: Failed to check whether UseCompressedOops 
is set; assuming yes

got error too.

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
val lines = ssc.socketTextStream("localhost", )
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
ssc.start()
ssc.awaitTermination()



scala> import org.apache.spark._
import org.apache.spark._

scala> import org.apache.spark.streaming._
import org.apache.spark.streaming._

scala> import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.StreamingContext._

scala> val conf = new 
SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
conf: org.apache.spark.SparkConf = 
org.apache.spark.SparkConf@67bcaf<mailto:org.apache.spark.SparkConf@67bcaf>

scala> val ssc = new StreamingContext(conf, Seconds(1))
16/06/14 18:28:44 WARN AbstractLifeCycle: FAILED 
SelectChannelConnector@0.0.0.0:4040<http://SelectChannelConnector@0.0.0.0:4040>:
 java.net.BindException: Address already in use
java.net.BindException: Address already in use
at sun.nio.ch.Net.bind0(Native Method)
at sun.nio.ch.Net.bind(Net.java:433)
at sun.nio.ch.Net.bind(Net.java:425)
at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
at 
org.eclipse.jetty.server.nio.SelectChannelConnector.open(SelectChannelConnector.java:187)
at 
org.eclipse.jetty.server.AbstractConnector.doStart(AbstractConnector.java:316)
at 
org.eclipse.jetty.server.nio.SelectChannelConnector.doStart(SelectChannelConnector.java:265)
at 
org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64)
at org.eclipse.jetty.server.Server.doStart(Server.java:293)
at 
org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64)
at 
org.apache.spark.ui.JettyUtils$.org$apache$spark$ui$JettyUtils$$connect$1(JettyUtils.scala:252)
at org.apache.spark.ui.JettyUtils$$anonfun$5.apply(JettyUtils.scala:262)
at org.apache.spark.ui.JettyUtils$$anonfun$5.apply(JettyUtils.scala:262)
at 
org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1988)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1979)
at org.apache.spark.ui.JettyUtils$.startJettyServer(JettyUtils.scala:262)
at org.apache.spark.ui.WebUI.bind(WebUI.scala:136)
at org.apache.spark.SparkContext$$anonfun$13.apply(SparkContext.scala:481)
at org.apache.spark.SparkContext$$anonfun$13.apply(SparkContext.scala:481)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.SparkContext.(SparkContext.scala:481)
at 
org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:874)
at 
org.apache.spark.streaming.StreamingContext.(StreamingContext.scala:81)
at 
$line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:36)
at 
$line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:41)
at 
$line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:43)
at 
$line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:45)
at 
$line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:47)
at 
$line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:49)
at 
$line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:51)
at $line37.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:53)
at 

  1   2   3   >