[apache-spark] documentation on File Metadata _metadata struct

2024-01-10 Thread Jason Horner
All, the only documentation about the File Metadata ( hidden_metadata struct) I can seem to find is on the databricks website https://docs.databricks.com/en/ingestion/file-metadata-column.html#file-metadata-column for reference here is the struct:_metadata: struct (nullable = false) |-- file_path: string (nullable = false) |-- file_name: string (nullable = false) |-- file_size: long (nullable = false) |-- file_block_start: long (nullable = false) |-- file_block_length: long (nullable = false) |-- file_modification_time: timestamp (nullable = false)  As far as I can tell this feature was released as part of spark 3.20 based on this stack overflow post https://stackoverflow.com/questions/62846669/can-i-get-metadata-of-files-reading-by-spark/77238087#77238087 unfortunately I wasn’t able to locate this in the release notes. Though I may have missed it somehow. So I have  the following questions and seeking guidance from the list at how to best approach this Is the documentation “missing” from the spark 3.20 site or am I just unable to find it:While it provides the file_modification_time, there doesn’t seem to be a corresponding file_creation_time Would both of these be issues that should be opened in JIRA?  Both of these seem like simple and useful things to add but are above my ability to submit PR’s for without some guidance. I’m happy to help especially with a documentation PR’ if someone can confirm and get me started in the right direction. I don’t really have the java / scala skills needed to implement the feature.  Thanks for any pointers  

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark 3 migration question

2022-05-18 Thread Jason Xu
Hi Spark user group,

Spark 2.4 to 3 migration for existing Spark jobs seems a big challenge
given a long list of changes in migration guide
,
they could introduce failures or output changes related to behavior changes
in Spark 3. This makes the migration risky if we don't identify and fix
changes in the migration guide.

However, the guide is a bit high level. For some items in the guide, I
don't know how to use an example query/job to compare behavior between 2.4
and 3.x.  A specific example:

   - In Spark version 2.4 and below, you can create map values with map
   type key via built-in function such as CreateMap, MapFromArrays, etc. In
   Spark 3.0, it’s not allowed to create map values with map type key with
   these built-in functions. Users can use map_entries function to convert map
   to array

[Announcement] Analytics Zoo 0.11.0 release

2021-07-21 Thread Jason Dai
Hi Everyone,


I’m happy to announce the 0.11.0 release
<https://github.com/intel-analytics/analytics-zoo/releases/tag/v0.11.0>
of Analytics
Zoo <https://github.com/intel-analytics/analytics-zoo/> (distributed
TensorFlow and PyTorch on Apache Spark & Ray); the highlights of this
release include:

   - Chronos
   
<https://analytics-zoo.readthedocs.io/en/v0.11.0/doc/Chronos/Overview/chronos.html>:
   a new time-series analysis library with AutoML:
  - Built-in support of ~100 algorithms for time series forecast
  
<https://analytics-zoo.readthedocs.io/en/v0.11.0/doc/Chronos/QuickStart/chronos-tsdataset-forecaster-quickstart.html>
(e.g.,
  TCN, seq2seq, ARIMA, Prophet, etc.), anomaly detection
  
<https://analytics-zoo.readthedocs.io/en/v0.11.0/doc/Chronos/QuickStart/chronos-anomaly-detector.html>
(e.g.,
  DBScan, AutoEncoder etc.), and feature transformations (using
  TSDataset
  
<https://analytics-zoo.readthedocs.io/en/v0.11.0/doc/Chronos/QuickStart/chronos-tsdataset-forecaster-quickstart.html>
  ).
  - Automatic tuning of built-in models (e.g., AutoProphet
  
<https://analytics-zoo.readthedocs.io/en/v0.11.0/doc/PythonAPI/Chronos/autotsestimator.html#chronos-autots-model-auto-prophet>
  , AutoARIMA
  
<https://analytics-zoo.readthedocs.io/en/v0.11.0/doc/PythonAPI/Chronos/autotsestimator.html#chronos-autots-model-auto-arima>
  , AutoXGBoost
  
<https://analytics-zoo.readthedocs.io/en/v0.11.0/doc/Orca/QuickStart/orca-autoxgboost-quickstart.html>,
  etc.) using AutoML
  - Simple APIs for tuning user-defined models (including PyTorch and
  Keras) with AutoML
  
<https://analytics-zoo.readthedocs.io/en/v0.11.0/doc/Orca/QuickStart/orca-autoestimator-pytorch-quickstart.html>
  - Improved APIs
  
<https://analytics-zoo.readthedocs.io/en/v0.11.0/doc/PythonAPI/Chronos/index.html>
  , documentation
  
<https://analytics-zoo.readthedocs.io/en/v0.11.0/doc/Chronos/Overview/chronos.html>,
  quick start examples
  
<https://analytics-zoo.readthedocs.io/en/v0.11.0/doc/UserGuide/notebooks.html>,
  etc.


   - Reference implementation of large-scale feature transformation
   pipelines for recommendation systems (e.g., DLRM
   
<https://github.com/intel-analytics/analytics-zoo/tree/branch-0.11/pyzoo/zoo/examples/friesian/feature/dlrm>
   , DIEN
   
<https://github.com/intel-analytics/analytics-zoo/tree/branch-0.11/pyzoo/zoo/examples/friesian/feature/dien>
   , W
   
<https://github.com/intel-analytics/analytics-zoo/tree/branch-0.11/pyzoo/zoo/examples/friesian/feature/wnd>,
   etc.)


   - Enhancements to Orca (scaling TF/PyTorch models to distributed Big
   Data) for end-to-end computer vision pipelines (distributed image
   preprocessing, training and inference); for more information, please see
   our CPVR 2021 tutorial <https://jason-dai.github.io/cvpr2021/>.


   - Initial Python and PySpark (in addition to Scala/Java) application
   support for PPML
   <https://analytics-zoo.readthedocs.io/en/v0.11.0/doc/PPML/Overview/ppml.html>
(privacy
   preserving big data and machine learning)

For more details, please see our github repo
<https://github.com/intel-analytics/analytics-zoo> and document website
<https://analytics-zoo.readthedocs.io/>.


Thanks,

-Jason


[Announcement] Analytics Zoo 0.10.0 release

2021-04-28 Thread Jason Dai
Hi Everyone,



I’m happy to announce the 0.10.0 release
<https://github.com/intel-analytics/analytics-zoo/releases/tag/v0.10.0> for
Analytics Zoo (distributed TensorFlow and PyTorch on Apache Spark/Flink &
Ray); the highlights of this release include:

   - A re-designed document website <https://analytics-zoo.readthedocs.io/>
   for the following improvements:
  - Orca
  
<https://analytics-zoo.readthedocs.io/en/v0.10.0/doc/Orca/Overview/orca.html>
  (simplified APIs for running distributed TF, PyTorch, BigDL and
OpenVINO on
  Spark/Ray)
  - RayOnSpark
  
<https://analytics-zoo.readthedocs.io/en/v0.10.0/doc/Ray/Overview/ray.html>
  (running Ray program inline with the Spark code)
  - Spark ML
  
<https://analytics-zoo.readthedocs.io/en/v0.10.0/doc/UseCase/nnframes.html>
  pipeline and Keras-like
  
<https://analytics-zoo.readthedocs.io/en/v0.10.0/doc/UseCase/keras-api.html>
  APIs for BigDL
  - Zouwu
  
<https://analytics-zoo.readthedocs.io/en/v0.10.0/doc/Zouwu/Overview/zouwu.html>
  (time series framework with AutoML)
  - Ready-to-run colab notebooks
  
<https://analytics-zoo.readthedocs.io/en/v0.10.0/doc/UserGuide/notebooks.html>
  (for Orca, RayOnSpark, Zouwu, etc.)


   - Experimental PPML
   <https://analytics-zoo.readthedocs.io/en/v0.10.0/doc/PPML/Overview/ppml.html>
   support for Privacy Preserving Big Data AI
  - Running unmodified Apache Spark/Flink and TF/PyTorch/BigDL/OpenVINO
  in a secure fashion on cloud



 Thanks,

-Jason


Fwd: [Announcement] Analytics Zoo 0.8 release

2020-04-27 Thread Jason Dai
FYI :-)

-- Forwarded message -
From: Jason Dai 
Date: Tue, Apr 28, 2020 at 10:31 AM
Subject: [Announcement] Analytics Zoo 0.8 release
To: BigDL User Group 

Hi all,



We are happy to announce the 0.8 release of Analytics Zoo
<https://github.com/intel-analytics/analytics-zoo/>, a unified Data
Analytics and AI platform for *distributed TensorFlow, Keras, PyTorch,
BigDL, Apache Spark/Flink and Ray**. S*ome of the notable new features in
this release are:



   - First official release of AutoML support
   <https://analytics-zoo.github.io/0.8.1/#ProgrammingGuide/AutoML/overview/>
   - Improved support for running Analytics Zoo on K8s
   <https://analytics-zoo.github.io/0.8.1/#ProgrammingGuide/k8s/>
   - Improvement to tfpark
   <https://analytics-zoo.github.io/0.8.1/#ProgrammingGuide/TFPark/tensorflow/>
(distributed
   TensorFlow on Spark), including seamless scale-out of
*tf.data.Dataset pipelines
   on Spark
   
<https://analytics-zoo.github.io/master/#ProgrammingGuide/TFPark/how-to-import-data/%23working-with-data-files-including-csv-files-text-files-and-tfrecord-files>*,
   support of *Spark Dataframes in TFDataSet
   
<https://analytics-zoo.github.io/0.8.1/#ProgrammingGuide/TFPark/how-to-import-data/%23working-with-rdd-or-spark-dataframe-data>*,
   support for pre-made TensorFlow Estimator, etc.
   - Improvement to Cluster Serving
   
<https://analytics-zoo.github.io/0.8.1/#ClusterServingGuide/ProgrammingGuide/>
(automatically
   distributed serving of DL models), including support for performance mode,
   better TensorBoard integration, etc.
   - Improvement to time series analysis (including new MTNet model and
   project Zouwu <https://analytics-zoo.github.io/0.8.1/#Zouwu/overview/>)
   - Upgrading OpenVINO support to 2020 R1



For more details, you may refer to the project website at
https://github.com/intel-analytics/analytics-zoo/, and the Getting Started
<https://analytics-zoo.github.io/0.8.1/#gettingstarted/> page.



Thanks,

-Jason


[Announcement] Analytics Zoo 0.8 release

2020-04-27 Thread Jason Dai
Hi all,



We are happy to announce the 0.8 release of Analytics Zoo
<https://github.com/intel-analytics/analytics-zoo/>, a unified Data
Analytics and AI platform for *distributed TensorFlow, Keras, PyTorch,
BigDL, Apache Spark/Flink and Ray**. S*ome of the notable new features in
this release are:



   - First official release of AutoML support
   <https://analytics-zoo.github.io/0.8.1/#ProgrammingGuide/AutoML/overview/>
   - Improved support for running Analytics Zoo on K8s
   <https://analytics-zoo.github.io/0.8.1/#ProgrammingGuide/k8s/>
   - Improvement to tfpark
   <https://analytics-zoo.github.io/0.8.1/#ProgrammingGuide/TFPark/tensorflow/>
   (distributed TensorFlow on Spark), including seamless scale-out of
*tf.data.Dataset
   pipelines on Spark
   
<https://analytics-zoo.github.io/master/#ProgrammingGuide/TFPark/how-to-import-data/#working-with-data-files-including-csv-files-text-files-and-tfrecord-files>*,
   support of *Spark Dataframes in TFDataSet
   
<https://analytics-zoo.github.io/0.8.1/#ProgrammingGuide/TFPark/how-to-import-data/#working-with-rdd-or-spark-dataframe-data>*,
   support for pre-made TensorFlow Estimator, etc.
   - Improvement to Cluster Serving
   
<https://analytics-zoo.github.io/0.8.1/#ClusterServingGuide/ProgrammingGuide/>
   (automatically distributed serving of DL models), including support for
   performance mode, better TensorBoard integration, etc.
   - Improvement to time series analysis (including new MTNet model and
   project Zouwu <https://analytics-zoo.github.io/0.8.1/#Zouwu/overview/>)
   - Upgrading OpenVINO support to 2020 R1



For more details, you may refer to the project website at
https://github.com/intel-analytics/analytics-zoo/, and the Getting Started
<https://analytics-zoo.github.io/0.8.1/#gettingstarted/> page.



Thanks,

-Jason


Re: [Announcement] Analytics Zoo 0.7.0 release

2020-01-20 Thread Jason Dai
Fixed one typo below: should be TensorFlow 1.15 support in tfpark :-)

On Tue, Jan 21, 2020 at 7:52 AM Jason Dai  wrote:

> Hi all,
>
>
>
> We are happy to announce the 0.7.0 release of Analytics Zoo
> <https://github.com/intel-analytics/analytics-zoo/>, a unified Data
> Analytics and AI platform for *distributed TensorFlow, Keras, PyTorch,
> BigDL, Apache Spark/Flink and Ray. S*ome of the notable new features in
> this release are:
>
>
>
>- *Cluster Serving*, which provides automatically distributed (*TensorFlow,
>PyTorch, Caffe, BigDL and OpenVINO*) model serving across (*YARN or
>K8s*) clusters with a simple pub/sub API; for more details, please
>refer to here
>
> <https://analytics-zoo.github.io/0.7.0/#ClusterServingGuide/ProgrammingGuide/>
>.
>
>
>- Improvements to tfpark
>
> <https://analytics-zoo.github.io/0.7.0/#ProgrammingGuide/tensorflow/>(distributed
>TensorFlow on Spark), including GAN
>
> <https://github.com/intel-analytics/analytics-zoo/blob/branch-0.7/pyzoo/zoo/examples/tensorflow/tfpark/README.md#run-the-gan-example-after-pip-install>support,
>TensorFlow *1.15* support, and various improvements to TFDataSet APIs,
>etc.
>
>
>- Improved distributed PyTorch
><https://analytics-zoo.github.io/0.7.0/#ProgrammingGuide/pytorch/>training
>on Spark, improved support for running Analytics Zoo on K8s and
>Databricks
>
> <https://github.com/intel-analytics/analytics-zoo/blob/branch-0.7/docs/docs/PlatformGuide/AnalyticsZoo-on-Databricks.md>,
>BigDL 0.10.0 support, etc.
>
>
> For more details, you may refer to the project website at
> https://github.com/intel-analytics/analytics-zoo/
>
>
> Thanks,
>
> -Jason
>


[Announcement] Analytics Zoo 0.7.0 release

2020-01-20 Thread Jason Dai
Hi all,



We are happy to announce the 0.7.0 release of Analytics Zoo
<https://github.com/intel-analytics/analytics-zoo/>, a unified Data
Analytics and AI platform for *distributed TensorFlow, Keras, PyTorch,
BigDL, Apache Spark/Flink and Ray. S*ome of the notable new features in
this release are:



   - *Cluster Serving*, which provides automatically distributed (*TensorFlow,
   PyTorch, Caffe, BigDL and OpenVINO*) model serving across (*YARN or K8s*)
   clusters with a simple pub/sub API; for more details, please refer to
   here
   
<https://analytics-zoo.github.io/0.7.0/#ClusterServingGuide/ProgrammingGuide/>
   .


   - Improvements to tfpark
   
<https://analytics-zoo.github.io/0.7.0/#ProgrammingGuide/tensorflow/>(distributed
   TensorFlow on Spark), including GAN
   
<https://github.com/intel-analytics/analytics-zoo/blob/branch-0.7/pyzoo/zoo/examples/tensorflow/tfpark/README.md#run-the-gan-example-after-pip-install>support,
   TensorFlow 1.5 support, and various improvements to TFDataSet APIs, etc.


   - Improved distributed PyTorch
   <https://analytics-zoo.github.io/0.7.0/#ProgrammingGuide/pytorch/>training
   on Spark, improved support for running Analytics Zoo on K8s and
   Databricks
   
<https://github.com/intel-analytics/analytics-zoo/blob/branch-0.7/docs/docs/PlatformGuide/AnalyticsZoo-on-Databricks.md>,
   BigDL 0.10.0 support, etc.


For more details, you may refer to the project website at
https://github.com/intel-analytics/analytics-zoo/


Thanks,

-Jason


Re: Why Apache Spark doesn't use Calcite?

2020-01-13 Thread Jason Nerothin
The implementation they chose supports push down predicates, Datasets and
other features that are not available in Calcite:

https://databricks.com/glossary/catalyst-optimizer

On Mon, Jan 13, 2020 at 8:24 AM newroyker  wrote:

> Was there a qualitative or quantitative benchmark done before a design
> decision was made not to use Calcite?
>
> Are there limitations (for heuristic based, cost based, * aware optimizer)
> in Calcite, and frameworks built on top of Calcite? In the context of big
> data / TCPH benchmarks.
>
> I was unable to dig up anything concrete from user group / Jira. Appreciate
> if any Catalyst veteran here can give me pointers. Trying to defend
> Spark/Catalyst.
>
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 
Thanks,
Jason


alternatives to shading

2019-12-17 Thread Jason Nerothin
Our build is complex; it uses a large number of third party jars and
generates an uber jar that is shaded before we pass it to spark submit. We
shade to avoid ClassLoader collisions with Spark platform dependencies
(e.g. protobuf 3).

Managing the dependencies/shade is cumbersome and error prone. The shade
plugin itself takes a long time to run. Some jars (e.g. apache commons
products) use classpath reflection - therefore the build does not fail
until runtime.

We have attempted the spark.{user,executor}.userClasspathFirst settings,
but they're marked as experimental and fail sometimes.

We are considering implementing our own ClassLoaders and/or rebuilding and
shading the spark distribution.

Are there better alternatives?

-- 
Thanks,
Jason


[Announcement] Analytics Zoo 0.5 release

2019-06-17 Thread Jason Dai
Hi all,


We are happy to announce the 0.5 release of Analytics Zoo
<https://github.com/intel-analytics/analytics-zoo/>, a unified Analytics +
AI platform for *distributed TensorFlow, Keras & BigDL on Apache Spark*;
some of the notable new features in this release are:


   - tfpark
   <https://analytics-zoo.github.io/0.5.1/#ProgrammingGuide/tensorflow/>:
   distributed TensorFlow for Apache Spark
  - Support Keras
  <https://analytics-zoo.github.io/0.5.1/#APIGuide/TFPark/model/> and
  TensorFlow Estimator
  <https://analytics-zoo.github.io/0.5.1/#APIGuide/TFPark/estimator/>
  APIs
  - Built-in NLP models
  <https://analytics-zoo.github.io/0.5.1/#APIGuide/TFPark/text-models/>
  (including NER, Intent Extraction and POS Tagging)
  - Premade BERT estimators
  <https://analytics-zoo.github.io/0.5.1/#APIGuide/TFPark/bert-classifier/>
  (e.g., BERT Classifier)


   - Model Inference improvement, including
  - Int8
  
<https://github.com/intel-analytics/analytics-zoo/tree/branch-0.5/zoo/src/main/scala/com/intel/analytics/zoo/examples/vnni>
(DL
  Boost/VNNI) inference support based on BigDL and OpenVINO
  - Distributed, streaming inference
  
<https://github.com/intel-analytics/analytics-zoo/tree/branch-0.5/apps/model-inference-examples>
  using Spark Streaming and Apache Flink


   - Additional built-in models for BigDL, including fine-tuning pipeline
   for SSD
   
<https://github.com/intel-analytics/analytics-zoo/tree/branch-0.5/zoo/src/main/scala/com/intel/analytics/zoo/examples/objectdetection/finetune/ssd>,
   Transformer
   
<https://github.com/intel-analytics/analytics-zoo/tree/branch-0.5/pyzoo/zoo/examples/attention>,
   BERT
   
<https://github.com/intel-analytics/analytics-zoo/blob/branch-0.5/pyzoo/zoo/pipeline/api/keras/layers/self_attention.py#L226>,
   etc.


   - Support for BigDL 0.8.0; please see the download page
   <https://analytics-zoo.github.io/master/#release-download/> for the
   supported versions and releases.

 For more details, you may refer to the project website at
https://github.com/intel-analytics/analytics-zoo/



Thanks,

-Jason


Re: Streaming job, catch exceptions

2019-05-21 Thread Jason Nerothin
Yes.

If the job fails repeatedly (4 times in this case), Spark assumes that
there is a problem in the Job and notifies the user. In exchange for this,
the engine can go on to serve other jobs with its available resources.

I would try the following until things improve:

1. Figure out what's wrong with the Job that's failing
2. Make exception handling more functional:
http://oneeyedmen.com/failure-is-not-an-option-part-4.html (kotlin, but
ideas still work)

Why do #2? Because it will force you to decide which exceptions: your code
should handle, spark should handle, should cause job failure (current
behavior).

By analogy to the halting problem
<https://en.wikipedia.org/wiki/Chaitin%27s_constant#Relationship_to_the_halting_problem>,
I believe that expecting a program to handle all possible exceptional
states is unreasonable.

Jm2c
Jason


On Tue, May 21, 2019 at 9:30 AM bsikander  wrote:

> umm, i am not sure if I got this fully.
>
> It is a design decision to not have context.stop() right after
> awaitTermination throws exception?
>
> So, the ideology is that if after n tries (default 4) a task fails, the
> spark should fail fast and let user know? Is this correct?
>
>
> As you mentioned there are many error classes and as the chances of getting
> an exception are quite high. If the above ideology is correct then it makes
> it really hard to keep the job up and running all the time especially
> streaming cases.
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 
Thanks,
Jason


Re: Streaming job, catch exceptions

2019-05-21 Thread Jason Nerothin
Correction: The Driver manages the Tasks, the resource manager serves up
resources to the Driver or Task.

On Tue, May 21, 2019 at 9:11 AM Jason Nerothin 
wrote:

> The behavior is a deliberate design decision by the Spark team.
>
> If Spark were to "fail fast", it would prevent the system from recovering
> from many classes of errors that are in principle recoverable (for example
> if two otherwise unrelated jobs cause a garbage collection spike on the
> same node). Checkpointing and similar features have been added to support
> high availability and platform resilience.
>
> As regards the more general topic of exception handling and recovery, I
> side with Bruce Eckel <https://www.artima.com/intv/handcuffs.html> and
> (for Java) Josh Bloch (see Effective Java, Exception Handling). The
> Scala+functional community is similarly opinionated against using
> exceptions for explicit control flow. (scala.util.Try is useful for
> supporting libraries that don't share this opinion.)
>
> Higher-level design thoughts:
>
> I recommend reading Chapter 15 of Chambers & Zaharia's *Spark The
> Definitive Guide *(at least)*. *The Spark engine makes some assumptions
> about execution boundaries being managed by Spark (that the Spark Jobs get
> broken into Tasks on the Executor and are managed by the resource manager).
> If multiple Threads are executing within a given Task, I would expect
> things like data exchange/shuffle to get unpredictable.
>
> Said a different way: Spark is a micro-batch architecture, even when using
> the streaming apis. The Spark Application is assumed to be relatively
> light-weight (the goal is to parallelize execution across big data, after
> all).
>
> You might also look at the way the Apache Livy
> <https://livy.incubator.apache.org/> team is implementing their solution.
>
> HTH
> Jason
>
>
> On Tue, May 21, 2019 at 6:04 AM bsikander  wrote:
>
>> Ok, I found the reason.
>>
>> In my QueueStream example, I have a while(true) which keeps on adding the
>> RDDs, my awaitTermination call if after the while loop. Since, the while
>> loop never exits, awaitTermination never gets fired and never get reported
>> the exceptions.
>>
>>
>> The above was just the problem with the code that I tried to show my
>> problem
>> with.
>>
>> My real problem was due to the shutdown behavior of Spark. Spark streaming
>> does the following
>>
>> - context.start() triggers the pipeline, context.awaitTerminate() block
>> the
>> current thread, whenever an exception is reported, awaitTerminated throws
>> an
>> exception. Since generally, we never have any code after awaitTerminate,
>> the
>> shutdown hooks get called which stops the spark context.
>>
>> - I am using spark-jobserver, when an exception is reported from
>> awaitTerminate, jobserver catches the exception and updates the status of
>> job in database but the driver process keeps on running because the main
>> thread in driver is waiting for an Akka actor to shutdown which belongs to
>> jobserver. Since, it never shutsdown, the driver keeps on running and no
>> one
>> executes a context.stop(). Since context.stop() is not executed, the
>> jobschedular and generator keeps on running and job also keeps on going.
>>
>> This implicit behavior of Spark where it relies on shutdown hooks to close
>> the context is a bit strange. I believe that as soon as an exception is
>> reported, the spark should just execute context.stop(). This behavior can
>> have serious consequence e.g. data loss. Will fix it though.
>>
>> What is your opinion on stopping the context as soon as an exception is
>> raised?
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>
> --
> Thanks,
> Jason
>


-- 
Thanks,
Jason


Re: Streaming job, catch exceptions

2019-05-21 Thread Jason Nerothin
The behavior is a deliberate design decision by the Spark team.

If Spark were to "fail fast", it would prevent the system from recovering
from many classes of errors that are in principle recoverable (for example
if two otherwise unrelated jobs cause a garbage collection spike on the
same node). Checkpointing and similar features have been added to support
high availability and platform resilience.

As regards the more general topic of exception handling and recovery, I
side with Bruce Eckel <https://www.artima.com/intv/handcuffs.html> and (for
Java) Josh Bloch (see Effective Java, Exception Handling). The
Scala+functional community is similarly opinionated against using
exceptions for explicit control flow. (scala.util.Try is useful for
supporting libraries that don't share this opinion.)

Higher-level design thoughts:

I recommend reading Chapter 15 of Chambers & Zaharia's *Spark The
Definitive Guide *(at least)*. *The Spark engine makes some assumptions
about execution boundaries being managed by Spark (that the Spark Jobs get
broken into Tasks on the Executor and are managed by the resource manager).
If multiple Threads are executing within a given Task, I would expect
things like data exchange/shuffle to get unpredictable.

Said a different way: Spark is a micro-batch architecture, even when using
the streaming apis. The Spark Application is assumed to be relatively
light-weight (the goal is to parallelize execution across big data, after
all).

You might also look at the way the Apache Livy
<https://livy.incubator.apache.org/> team is implementing their solution.

HTH
Jason


On Tue, May 21, 2019 at 6:04 AM bsikander  wrote:

> Ok, I found the reason.
>
> In my QueueStream example, I have a while(true) which keeps on adding the
> RDDs, my awaitTermination call if after the while loop. Since, the while
> loop never exits, awaitTermination never gets fired and never get reported
> the exceptions.
>
>
> The above was just the problem with the code that I tried to show my
> problem
> with.
>
> My real problem was due to the shutdown behavior of Spark. Spark streaming
> does the following
>
> - context.start() triggers the pipeline, context.awaitTerminate() block the
> current thread, whenever an exception is reported, awaitTerminated throws
> an
> exception. Since generally, we never have any code after awaitTerminate,
> the
> shutdown hooks get called which stops the spark context.
>
> - I am using spark-jobserver, when an exception is reported from
> awaitTerminate, jobserver catches the exception and updates the status of
> job in database but the driver process keeps on running because the main
> thread in driver is waiting for an Akka actor to shutdown which belongs to
> jobserver. Since, it never shutsdown, the driver keeps on running and no
> one
> executes a context.stop(). Since context.stop() is not executed, the
> jobschedular and generator keeps on running and job also keeps on going.
>
> This implicit behavior of Spark where it relies on shutdown hooks to close
> the context is a bit strange. I believe that as soon as an exception is
> reported, the spark should just execute context.stop(). This behavior can
> have serious consequence e.g. data loss. Will fix it though.
>
> What is your opinion on stopping the context as soon as an exception is
> raised?
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 
Thanks,
Jason


Re: Why do we need Java-Friendly APIs in Spark ?

2019-05-15 Thread Jason Nerothin
I did a quick google search for "Java/Scala interoperability" and was
surprised to find very few recent results on the topic. (Has the world
given up?)

It's easy to use Java library code from Scala, but the opposite is not true.

I would think about the problem this way: Do *YOU* need to provide a Java
API in your product?

If you decide to support both, beware the Law of Leaky Abstractions
<https://www.joelonsoftware.com/2002/11/11/the-law-of-leaky-abstractions/> and
look at what the Spark team came up with. (DataFrames in version 2.0 target
this same problem (among others) - to provide a single abstraction that
works across Scala, Java, Python, and R. But what they came up with
required the APIs you list to make it work.)

Think carefully about what new things you're trying to provide and what
things you're trying to hide beneath your abstraction.

HTH
Jason




On Wed, May 15, 2019 at 8:24 AM Jean-Georges Perrin  wrote:

> I see… Did you consider Structure Streaming?
>
> Otherwise, you could create a factory that will build your higher level
> object, that will return an interface defining your API,  but the
> implementation may vary based on the context.
>
> And English is not my native language as well...
>
> Jean -Georges Perrin
> j...@jgp.net
>
>
>
>
> On May 14, 2019, at 21:47, Gary Gao  wrote:
>
> Thanks for reply, Jean
>   In my project , I'm working on higher abstraction layer of spark
> streaming to build a data processing product and trying to provide a common
> api for java and scala developers.
>   You can see the abstract class defined here:
> https://github.com/InterestingLab/waterdrop/blob/master/waterdrop-apis/src/main/scala/io/github/interestinglab/waterdrop/apis/BaseStreamingInput.scal
> <https://github.com/InterestingLab/waterdrop/blob/master/waterdrop-apis/src/main/scala/io/github/interestinglab/waterdrop/apis/BaseStreamingInput.scala>
>
>
>There is a method , getDStream, that return a DStream[T], which
> currently support scala class to extend this class and override getDStream,
> But I also want java class to extend this class to return a JavaDStream.
> This is my real problem.
>  Tell me if the above description is not clear, because English is
> not my native language.
>
> Thanks in advance
> Gary
>
> On Tue, May 14, 2019 at 11:06 PM Jean Georges Perrin  wrote:
>
>> There are a little bit more than the list you specified  nevertheless,
>> some data types are not directly compatible between Scala and Java and
>> requires conversion, so it’s good to not pollute your code with plenty of
>> conversion and focus on using the straight API.
>>
>> I don’t remember from the top of my head, but if you use more Spark 2
>> features (dataframes, structured streaming...) you will require less of
>> those Java-specific API.
>>
>> Do you see a problem here? What’s your take on this?
>>
>> jg
>>
>>
>> On May 14, 2019, at 10:22, Gary Gao  wrote:
>>
>> Hi all,
>>
>> I am wondering why do we need Java-Friendly APIs in Spark ? Why can't we
>> just use scala apis in java codes ? What's the difference ?
>>
>> Some examples of Java-Friendly APIs commented in Spark code are as
>> follows:
>>
>> JavaDStream
>> JavaInputDStream
>> JavaStreamingContext
>> JavaSparkContext
>>
>>
>

-- 
Thanks,
Jason


Re: BigDL and Analytics Zoo talks at upcoming Spark+AI Summit and Strata London

2019-05-14 Thread Jason Dai
The slides for the talks have been uploaded to
https://analytics-zoo.github.io/master/#presentations/.

Thanks,
-Jason

On Fri, Apr 19, 2019 at 9:55 PM Khare, Ankit  wrote:

> Thanks for sharing.
>
> Sent from my iPhone
>
> On 19. Apr 2019, at 01:35, Jason Dai  wrote:
>
> Hi all,
>
>
> Please see below for a list of upcoming technical talks
> on BigDL and Analytics Zoo (
> https://github.com/intel-analytics/analytics-zoo/) in the coming weeks:
>
>
>- Engineers from CERN will present a technical talk Deep Learning on
>Apache Spark at CERN’s Large Hardon Collider with Intel Technologies
>
> <https://databricks.com/sparkaisummit/north-america/sessions-single-2019?id=193>
> at *Spark + AI Summit in San Francisco* (11:50am-12:20pm, Wednesday,
>24 April 2019)
>- Engineers from Intel will present a technical talk Leveraging NLP
>and Deep Learning for Document Recommendations in the Cloud
><https://databricks.com/sparkaisummit/sessions-single-2019/?id=76> at 
> *Spark
>+ AI Summit in San Francisco* (5:20-6pm, Wednesday, 24 April 2019)
>- Engineers from Dell EMC and Intel will present a technical talk Using
>Deep Learning on Apache Spark to Diagnose Thoracic Pathology from Chest
>X-rays
>
> <https://databricks.com/sparkaisummit/north-america/sessions-single-2019?id=119>
> at* Spark + AI Summit in San Francisco* (3:30-4:10pm, Thursday, 25
>April 2019)
>- Engineers from Intel will host a session Game Playing Using AI on
>Apache Spark
><https://databricks.com/sparkaisummit/sessions-single-2019/?id=140> at*
>Spark + AI Summit in San Francisco* (5:30-6:10pm, Thursday, 25 April
>2019)
>- Engineers from Intel will host a session LSTM-based time series
>anomaly detection using Analytics Zoo for Spark and BigDL
>
> <https://conferences.oreilly.com/strata/strata-eu/public/schedule/detail/74077>
>at *Strata Data Conference in London* (16:35-17:15pm, Wednesday, 1 May
>2019)
>
> If you plan to attend these events, please drop by and talk to the
> speakers :-)
>
>
>
> Thanks,
>
> -Jason
>
>


Re: Streaming job, catch exceptions

2019-05-12 Thread Jason Nerothin
Code would be very helpful, but it *seems like* you are:

1. Writing in Java
2. Wrapping the *entire app *in a try/catch
3. Executing in local mode

The code that is throwing the exceptions is not executed locally in the
driver process. Spark is executing the failing code on the cluster.

On Sun, May 12, 2019 at 3:37 PM bsikander  wrote:

> Hi,
> Anyone? This should be a straight forward one :)
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 
Thanks,
Jason


Re: Deep Learning with Spark, what is your experience?

2019-05-05 Thread Jason Dai
Hi Riccardo,

Yes, you can run Tensorflow distributed training (and inference) inline
with PySpark; see some examples at
https://github.com/intel-analytics/analytics-zoo/blob/master/pyzoo/zoo/examples/tensorflow/tfpark/estimator_dataset.py
(using
TF Keras API),
https://github.com/intel-analytics/analytics-zoo/blob/master/pyzoo/zoo/examples/tensorflow/tfpark/estimator_dataset.py
(using
TF Estimator API) and
https://github.com/intel-analytics/analytics-zoo/tree/master/pyzoo/zoo/examples/tensorflow/distributed_training
.

For Keras API support in Analytics Zoo, it's a new implementation of Keras
1.2.2 on Spark (using BigDL).

Thanks,
-Jason

On Mon, May 6, 2019 at 5:37 AM Riccardo Ferrari  wrote:

> Thanks everyone, I really appreciate your contributions here.
>
> @Jason, thanks for the references I'll take a look. Quickly checking
> github:
> https://github.com/intel-analytics/analytics-zoo#distributed-tensorflow-and-keras-on-sparkbigdl
> Do I understand correctly I can:
>
>- Prepare my data with Spark
>- Define a Tensorflow model
>- Train it in distributed fashion
>
> When using the Keras API, is it the real Keras with just an adapter layer
> or it si a completely different API that mimic Keras?
>
> @Gurav, I agree that "you should pick the right tool for the job".
>
> The purpose of this discussion is to understand/explore if we really need
> another stack or we can leverage on the existing infrastructure and
> expertise to accomplish the task.
> We currently have some ML jobs and Spark proved to be the perfect fit for
> us. We do know it enough to be confident we can deliver what is asked, it
> scale, it is reslient, it works.
>
> We are starting to evaluate/introduce some DL models, being able to
> leverage on the existing infra it would be a big plus. It is not only
> having to deal with a new set of machines running a different stack (ie
> tensorflow, mxnet, ...) it is everything around it, tuning, managing,
> packing applications, testing and so on. Are reasonable concerns?
>
> Best,
>
> On Sun, May 5, 2019 at 8:06 PM Gourav Sengupta 
> wrote:
>
>> If someone is trying to actually use deep learning algorithms, their
>> focus should be in choosing the technology stack which gives them maximum
>> flexibility to try the nuances of their algorithms.
>>
>> From a personal perspective, I always prefer to use libraries which
>> provides the best flexibility and extensibility in terms of the science/
>> mathematics of the subjects. For example try to open a book on Linear
>> Regression and then try to see whether all the mathematical formulations
>> are available in the SPARK module for regression or not.
>>
>> It is always better to choose a technology that fits into the nuances and
>> perfection of the science, rather than choose a technology and then try to
>> fit the science into it.
>>
>> Regards,
>> Gourav
>>
>> On Sun, May 5, 2019 at 2:23 PM Jason Dai  wrote:
>>
>>> You may find talks from Analytics Zoo users at
>>> https://analytics-zoo.github.io/master/#presentations/; in particular,
>>> some of recent user examples on Analytics Zoo:
>>>
>>>- Mastercard:
>>>
>>> https://software.intel.com/en-us/articles/deep-learning-with-analytic-zoo-optimizes-mastercard-recommender-ai-service
>>>
>>>- Azure:
>>>
>>> https://software.intel.com/en-us/articles/use-analytics-zoo-to-inject-ai-into-customer-service-platforms-on-microsoft-azure-part-1
>>>- CERN:
>>>
>>> https://db-blog.web.cern.ch/blog/luca-canali/machine-learning-pipelines-high-energy-physics-using-apache-spark-bigdl
>>>- Midea/KUKA:
>>>
>>> https://software.intel.com/en-us/articles/industrial-inspection-platform-in-midea-and-kuka-using-distributed-tensorflow-on-analytics
>>>- Talroo:
>>>
>>> https://software.intel.com/en-us/articles/talroo-uses-analytics-zoo-and-aws-to-leverage-deep-learning-for-job-recommendation
>>>
>>> <https://software.intel.com/en-us/articles/talroo-uses-analytics-zoo-and-aws-to-leverage-deep-learning-for-job-recommendations>
>>>
>>> Thanks,
>>> -Jason
>>>
>>> On Sun, May 5, 2019 at 6:29 AM Riccardo Ferrari 
>>> wrote:
>>>
>>>> Thank you for your answers!
>>>>
>>>> While it is clear each DL framework can solve the distributed model
>>>> training on their own (some better than others).  Still I see a lot of
>>>> value of having Spark on the ETL/pre-processing part, thus the origin of my
>>>> question.
>>>&

Re: Deep Learning with Spark, what is your experience?

2019-05-05 Thread Jason Dai
You may find talks from Analytics Zoo users at
https://analytics-zoo.github.io/master/#presentations/; in particular, some
of recent user examples on Analytics Zoo:

   - Mastercard:
   
https://software.intel.com/en-us/articles/deep-learning-with-analytic-zoo-optimizes-mastercard-recommender-ai-service

   - Azure:
   
https://software.intel.com/en-us/articles/use-analytics-zoo-to-inject-ai-into-customer-service-platforms-on-microsoft-azure-part-1
   - CERN:
   
https://db-blog.web.cern.ch/blog/luca-canali/machine-learning-pipelines-high-energy-physics-using-apache-spark-bigdl
   - Midea/KUKA:
   
https://software.intel.com/en-us/articles/industrial-inspection-platform-in-midea-and-kuka-using-distributed-tensorflow-on-analytics
   - Talroo:
   
https://software.intel.com/en-us/articles/talroo-uses-analytics-zoo-and-aws-to-leverage-deep-learning-for-job-recommendation
   
<https://software.intel.com/en-us/articles/talroo-uses-analytics-zoo-and-aws-to-leverage-deep-learning-for-job-recommendations>

Thanks,
-Jason

On Sun, May 5, 2019 at 6:29 AM Riccardo Ferrari  wrote:

> Thank you for your answers!
>
> While it is clear each DL framework can solve the distributed model
> training on their own (some better than others).  Still I see a lot of
> value of having Spark on the ETL/pre-processing part, thus the origin of my
> question.
> I am trying to avoid to mange multiple stacks/workflows and hoping to
> unify my system. Projects like TensorflowOnSpark or Analytics-Zoo (to name
> couple) feels like they can help, still I really appreciate your comments
> and anyone that could add some value to this discussion. Does anyone have
> experience with them?
>
> Thanks
>
> On Sat, May 4, 2019 at 8:01 PM Pat Ferrel  wrote:
>
>> @Riccardo
>>
>> Spark does not do the DL learning part of the pipeline (afaik) so it is
>> limited to data ingestion and transforms (ETL). It therefore is optional
>> and other ETL options might be better for you.
>>
>> Most of the technologies @Gourav mentions have their own scaling based on
>> their own compute engines specialized for their DL implementations, so be
>> aware that Spark scaling has nothing to do with scaling most of the DL
>> engines, they have their own solutions.
>>
>> From: Gourav Sengupta 
>> 
>> Reply: Gourav Sengupta 
>> 
>> Date: May 4, 2019 at 10:24:29 AM
>> To: Riccardo Ferrari  
>> Cc: User  
>> Subject:  Re: Deep Learning with Spark, what is your experience?
>>
>> Try using MxNet and Horovod directly as well (I think that MXNet is worth
>> a try as well):
>> 1.
>> https://medium.com/apache-mxnet/distributed-training-using-apache-mxnet-with-horovod-44f98bf0e7b7
>> 2.
>> https://docs.nvidia.com/deeplearning/dgx/mxnet-release-notes/rel_19-01.html
>> 3. https://aws.amazon.com/mxnet/
>> 4.
>> https://aws.amazon.com/blogs/machine-learning/aws-deep-learning-amis-now-include-horovod-for-faster-multi-gpu-tensorflow-training-on-amazon-ec2-p3-instances/
>>
>>
>> Ofcourse Tensorflow is backed by Google's advertisement team as well
>> https://aws.amazon.com/blogs/machine-learning/scalable-multi-node-training-with-tensorflow/
>>
>>
>> Regards,
>>
>>
>>
>>
>> On Sat, May 4, 2019 at 10:59 AM Riccardo Ferrari 
>> wrote:
>>
>>> Hi list,
>>>
>>> I am trying to undestand if ti make sense to leverage on Spark as
>>> enabling platform for Deep Learning.
>>>
>>> My open question to you are:
>>>
>>>- Do you use Apache Spark in you DL pipelines?
>>>- How do you use Spark for DL? Is it just a stand-alone stage in the
>>>workflow (ie data preparation script) or is it  more integrated
>>>
>>> I see a major advantage in leveraging on Spark as a unified entrypoint,
>>> for example you can easily abstract data sources and leverage on existing
>>> team skills for data pre-processing and training. On the flip side you may
>>> hit some limitations including supported versions and so on.
>>> What is your experience?
>>>
>>> Thanks!
>>>
>>


Re: Handle Null Columns in Spark Structured Streaming Kafka

2019-04-29 Thread Jason Nerothin
See also here:
https://stackoverflow.com/questions/44671597/how-to-replace-null-values-with-a-specific-value-in-dataframe-using-spark-in-jav

On Mon, Apr 29, 2019 at 5:27 PM Jason Nerothin 
wrote:

> Spark SQL has had an na.fill function on it since at least 2.1. Would that
> work for you?
>
>
> https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/sql/DataFrameNaFunctions.html
>
> On Mon, Apr 29, 2019 at 4:57 PM Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> Hey Snehasish,
>>
>> Do you have a reproducer for this issue?
>>
>> Best Regards,
>> Ryan
>>
>>
>> On Wed, Apr 24, 2019 at 7:24 AM SNEHASISH DUTTA 
>> wrote:
>>
>>> Hi,
>>>
>>> While writing to kafka using spark structured streaming , if all the
>>> values in certain column are Null it gets dropped
>>> Is there any way to override this , other than using na.fill functions
>>>
>>> Regards,
>>> Snehasish
>>>
>>
>
> --
> Thanks,
> Jason
>


-- 
Thanks,
Jason


Re: Handle Null Columns in Spark Structured Streaming Kafka

2019-04-29 Thread Jason Nerothin
Spark SQL has had an na.fill function on it since at least 2.1. Would that
work for you?

https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/sql/DataFrameNaFunctions.html

On Mon, Apr 29, 2019 at 4:57 PM Shixiong(Ryan) Zhu 
wrote:

> Hey Snehasish,
>
> Do you have a reproducer for this issue?
>
> Best Regards,
> Ryan
>
>
> On Wed, Apr 24, 2019 at 7:24 AM SNEHASISH DUTTA 
> wrote:
>
>> Hi,
>>
>> While writing to kafka using spark structured streaming , if all the
>> values in certain column are Null it gets dropped
>> Is there any way to override this , other than using na.fill functions
>>
>> Regards,
>> Snehasish
>>
>

-- 
Thanks,
Jason


Re: Update / Delete records in Parquet

2019-04-22 Thread Jason Nerothin
Hi Chetan,

Do you have to use Parquet?

It just feels like it might be the wrong sink for a high-frequency change
scenario.

What are you trying to accomplish?

Thanks,
Jason

On Mon, Apr 22, 2019 at 2:09 PM Chetan Khatri 
wrote:

> Hello All,
>
> If I am doing incremental load / delta and would like to update / delete
> the records in parquet, I understands that parquet is immutable and can't
> be deleted / updated theoretically only append / overwrite can be done. But
> I can see utility tools which claims to add value for that.
>
> https://github.com/Factual/parquet-rewriter
>
> Please throw a light.
>
> Thanks
>


-- 
Thanks,
Jason


Re: --jars vs --spark.executor.extraClassPath vs --spark.driver.extraClassPath

2019-04-20 Thread Jason Nerothin
Hi Rajat,

A little more color:

The executor classpath will be used by the spark workers/slaves. For
example, all JVMs that are started with $SPARK_HOME/sbin/start-slave.sh. If
you run with --deploy-mode cluster, then the driver itself will be run from
on the cluster (with executor classpath).

If you run with --deploy-mode client, then the Driver will have its own
classpath (and JVM) on the host that you start it from (similar to running
from an IDE).

If you are NOT running from the shell, then it's usually best to build an
uber-jar containing all required jars and use spark-submit to send the
entire classpath to the cluster. Using --packages like this
<https://stackoverflow.com/questions/33928029/how-to-specify-multiple-dependencies-using-packages-for-spark-submit>
is
also a good option for jars that are in a repository (and also resolves
local paths during development).

To run driver code from an IDE, make sure your run/debug configuration is
picking up the spark libs you need as "provided" dependencies (sbt or
maven). This simulates the classpath that's provided by the Spark runtime.
I say 'simulates' because Spark (2.4.1) has access to 226 jar files in
$SPARK_HOME/jars and usually you're implementing against just a few of the
essential ones like spark-sql.

--jars is what to use for spark-shell.

Final related tidbit: If you're implementing in Scala, make sure your jars
are version-compatible with the scala compiler version (2.1.1 as of Spark
2.4.1).

HTH

Jason

On Sat, Apr 20, 2019 at 4:34 AM Subash Prabakar 
wrote:

> Hey Rajat,
>
> The documentation page is self explanatory..
>
> You can refer this for more configs
>
> https://spark.apache.org/docs/2.0.0/configuration.html
>  or any version of Spark documentation
>
> Thanks.
> Subash
>
> On Sat, 20 Apr 2019 at 16:04, rajat kumar 
> wrote:
>
>> Hi,
>>
>> Can anyone pls explain ?
>>
>>
>> On Mon, 15 Apr 2019, 09:31 rajat kumar >
>>> Hi All,
>>>
>>> I came across different parameters in spark submit
>>>
>>> --jars , --spark.executor.extraClassPath , --spark.driver.extraClassPath
>>>
>>> What are the differences between them? When to use which one? Will it
>>> differ
>>> if I use following:
>>>
>>> --master yarn --deploy-mode client
>>> --master yarn --deploy-mode cluster
>>>
>>>
>>> Thanks
>>> Rajat
>>>
>>

-- 
Thanks,
Jason


BigDL and Analytics Zoo talks at upcoming Spark+AI Summit and Strata London

2019-04-18 Thread Jason Dai
Hi all,


Please see below for a list of upcoming technical talks
on BigDL and Analytics Zoo (
https://github.com/intel-analytics/analytics-zoo/) in the coming weeks:


   - Engineers from CERN will present a technical talk Deep Learning on
   Apache Spark at CERN’s Large Hardon Collider with Intel Technologies
   
<https://databricks.com/sparkaisummit/north-america/sessions-single-2019?id=193>
at *Spark + AI Summit in San Francisco* (11:50am-12:20pm, Wednesday, 24
   April 2019)
   - Engineers from Intel will present a technical talk Leveraging NLP and
   Deep Learning for Document Recommendations in the Cloud
   <https://databricks.com/sparkaisummit/sessions-single-2019/?id=76> at *Spark
   + AI Summit in San Francisco* (5:20-6pm, Wednesday, 24 April 2019)
   - Engineers from Dell EMC and Intel will present a technical talk Using
   Deep Learning on Apache Spark to Diagnose Thoracic Pathology from Chest
   X-rays
   
<https://databricks.com/sparkaisummit/north-america/sessions-single-2019?id=119>
at* Spark + AI Summit in San Francisco* (3:30-4:10pm, Thursday, 25
   April 2019)
   - Engineers from Intel will host a session Game Playing Using AI on
   Apache Spark
   <https://databricks.com/sparkaisummit/sessions-single-2019/?id=140> at*
   Spark + AI Summit in San Francisco* (5:30-6:10pm, Thursday, 25 April 2019
   )
   - Engineers from Intel will host a session LSTM-based time series
   anomaly detection using Analytics Zoo for Spark and BigDL
   
<https://conferences.oreilly.com/strata/strata-eu/public/schedule/detail/74077>
   at *Strata Data Conference in London* (16:35-17:15pm, Wednesday, 1 May
   2019)

If you plan to attend these events, please drop by and talk to the speakers
:-)



Thanks,

-Jason


Re: Spark2: Deciphering saving text file name

2019-04-09 Thread Jason Nerothin
Hi Subash,

Short answer: It’s effectively random.

Longer answer: In general the DataFrameWriter expects to be receiving data
from multiple partitions. Let’s say you were writing to ORC instead of text.

In this case, even when you specify the output path, the writer creates a
directory at the specified path and saves one of those funny-named files
per partition.

Even longer: Assume you are running atop of YARN (or Messi or K8S...) In
this case, the resource manager is responsible for provisioning disk on
request, and it is the programmers’ responsibility to implement the
upstream business logic.

The implication is that it’s probably not a good idea to violate the
responsibility boundary. Because, if you do, you are probably going to
violate some implicit assumptions that the YARN designers are relying upon.
For example (just making this up): YARN will calculate available disk after
each write action completes.

HTH,
Jason



On Mon, Apr 8, 2019 at 19:55 Subash Prabakar 
wrote:

> Hi,
> While saving in Spark2 as text file - I see encoded/hash value attached in
> the part files along with part number. I am curious to know what is that
> value is about ?
>
> Example:
> ds.write.save(SaveMode.Overwrite).option("compression","gzip").text(path)
>
> Produces,
> part-1-1e4c5369-6694-4012-894a-73b971fe1ab1-c000.txt.gz
>
>
> 1e4c5369-6694-4012-894a-73b971fe1ab1-c000 => what is this value ?
>
> Is there any options available to remove this part or is it attached for
> some reason ?
>
> Thanks,
> Subash
>
-- 
Thanks,
Jason


Re: Structured streaming flatMapGroupWithState results out of order messages when reading from Kafka

2019-04-09 Thread Jason Nerothin
I had that identical problem. Here’s what I came up with:

https://github.com/ubiquibit-inc/sensor-failure


On Tue, Apr 9, 2019 at 04:37 Akila Wajirasena 
wrote:

> Hi
>
> I have a Kafka topic  which is already loaded with data. I use a stateful
> structured streaming pipeline using flatMapGroupWithState to consume the
> data in kafka in a streaming manner.
>
> However when I set shuffle partition count > 1 I get some out of order
> messages in to each of my GroupState. Is this the expected behavior or is
> the message ordering guaranteed when using flatMapGroupWithState with Kafka
> source?
>
> This my pipline;
>
> Kafka => GroupByKey(key from Kafka schema) => flatMapGroupWithState =>
> parquet
>
> When I printed out the Kafka offset for each key inside my state update
> function they are not in order. I am using spark 2.3.3.
>
> Thanks & Regards,
> Akila
>
>
>
> --
Thanks,
Jason


Re: Checking if cascading graph computation is possible in Spark

2019-04-05 Thread Jason Nerothin
*I guess I was focusing on this:*

#2
I want to do the above as a event driven way, *without using the batches*
(i tried micro batches, but I realised that’s not what I want), i.e., *for
each arriving event or as soon as a event message come my stream, not by
accumulating the event *

If you want to update your graph without pulling the older data back
through the entire DAG, it seems like you need to store the graph data
somewhere. So that's why I jumped to accumulators - the state would be
around from event to event, and not require a "reaggregation" for each
event.

Arbitrary stateful streaming has this ability "built in" - that is, the
engine stores your intermediate results in RAM and then the next event
picks up where the last one left off.

I've just implemented the arbitrary stateful streaming option... Couldn't
figure out a better way of avoiding the re-shuffle, so ended up keeping the
prior state in the engine.

I'm not using GraphX, but it seems like the approach should work
irrespective - there's an interface called GroupState that you hand off an
iterator for from call to call.

Do keep in mind that you have to think about out of order event arrivals...

Send me a message to my direct email and I'll provide a link to the
source... Not sure I'm fully grokking your entire use case...


On Fri, Apr 5, 2019 at 1:15 PM Basavaraj  wrote:

> I have checked broadcast of accumulated values, but not satellite stateful
> stabbing
>
> But, I am not sure how that helps here
>
> On Fri, 5 Apr 2019, 10:13 pm Jason Nerothin, 
> wrote:
>
>> Have you looked at Arbitrary Stateful Streaming and Broadcast
>> Accumulators?
>>
>> On Fri, Apr 5, 2019 at 10:55 AM Basavaraj  wrote:
>>
>>> Hi
>>>
>>> Have two questions
>>>
>>> #1
>>> I am trying to process events in realtime, outcome of the processing has
>>> to find a node in the GraphX and update that node as well (in case if any
>>> anomaly or state change), If a node is updated, I have to update the
>>> related nodes as well, want to know if GraphX can help in this by providing
>>> some native support
>>>
>>> #2
>>> I want to do the above as a event driven way, without using the batches
>>> (i tried micro batches, but I realised that’s not what I want), i.e., for
>>> each arriving event or as soon as a event message come my stream, not by
>>> accumulating the event
>>>
>>> I humbly welcome any pointers, constructive criticism
>>>
>>> Regards
>>> Basav
>>> - To
>>> unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>>
>> --
>> Thanks,
>> Jason
>>
>

-- 
Thanks,
Jason


Re: combineByKey

2019-04-05 Thread Jason Nerothin
Take a look at this SOF:

https://stackoverflow.com/questions/24804619/how-does-spark-aggregate-function-aggregatebykey-work

On Fri, Apr 5, 2019 at 12:25 PM Madabhattula Rajesh Kumar <
mrajaf...@gmail.com> wrote:

> Hi,
>
> Thank you for the details. It is a typo error while composing the mail.
> Below is the actual flow.
>
> Any idea, why the combineByKey is not working. aggregateByKey is working.
>
> //Defining createCombiner, mergeValue and mergeCombiner functions
>
> def createCombiner = (Id: String, value: String) => (value :: Nil).toSet
>
> def mergeValue = (accumulator1: Set[String], accumulator2: (String,
> String)) => accumulator1 ++ Set(accumulator2._2)
>
> def mergeCombiner: (Set[String], Set[String]) => Set[String] =
> (accumulator1: Set[String], accumulator2: Set[String]) => accumulator1 ++
> accumulator2
>
> sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.Id, (x.Id,
> x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner)
>
> *Compile Error:-*
>  found   : (String, String) => scala.collection.immutable.Set[String]
>  required: ((String, String)) => ?
>  sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.Id, (x.Id,
> x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner)
>
> *aggregateByKey =>*
>
> val result = sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.Id,
> (x.Id, x.value))).aggregateByKey(Set[String]())(
> (aggr, value) => aggr ++ Set(value._2),
> (aggr1, aggr2) => aggr1 ++ aggr2).collect().toMap
>
>  print(result)
>
> Map(0-d1 -> Set(t1, t2, t3, t4), 0-d2 -> Set(t1, t5, t6, t2), 0-d3 ->
> Set(t1, t2))
>
> Regards,
> Rajesh
>
> On Fri, Apr 5, 2019 at 9:58 PM Jason Nerothin 
> wrote:
>
>> I broke some of your code down into the following lines:
>>
>> import spark.implicits._
>>
>> val a: RDD[Messages]= sc.parallelize(messages)
>> val b: Dataset[Messages] = a.toDF.as[Messages]
>> val c: Dataset[(String, (String, String))] = b.map{x => (x.timeStamp
>> + "-" + x.Id, (x.Id, x.value))}
>>
>> You didn't capitalize .Id and your mergeValue0 and mergeCombiner don't
>> have the types you think for the reduceByKey.
>>
>> I recommend breaking the code down like this to statement-by-statement
>> when you get into a dance with the Scala type system.
>>
>> The type-safety that you're after (that eventually makes life *easier*)
>> is best supported by Dataset (would have prevented the .id vs .Id error).
>> Although there are some performance tradeoffs vs RDD and DataFrame...
>>
>>
>>
>>
>>
>>
>> On Fri, Apr 5, 2019 at 2:11 AM Madabhattula Rajesh Kumar <
>> mrajaf...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Any issue in the below code.
>>>
>>> case class Messages(timeStamp: Int, Id: String, value: String)
>>>
>>> val messages = Array(
>>>   Messages(0, "d1", "t1"),
>>>   Messages(0, "d1", "t1"),
>>>   Messages(0, "d1", "t1"),
>>>   Messages(0, "d1", "t1"),
>>>   Messages(0, "d1", "t2"),
>>>   Messages(0, "d1", "t2"),
>>>   Messages(0, "d1", "t3"),
>>>   Messages(0, "d1", "t4"),
>>>   Messages(0, "d2", "t1"),
>>>   Messages(0, "d2", "t1"),
>>>   Messages(0, "d2", "t5"),
>>>   Messages(0, "d2", "t6"),
>>>   Messages(0, "d2", "t2"),
>>>   Messages(0, "d2", "t2"),
>>>   Messages(0, "d3", "t1"),
>>>   Messages(0, "d3", "t1"),
>>>   Messages(0, "d3", "t2")
>>> )
>>>
>>> //Defining createCombiner, mergeValue and mergeCombiner functions
>>> def createCombiner = (id: String, value: String) => Set(value)
>>>
>>> def mergeValue0 = (accumulator1: Set[String], accumulator2: (String,
>>> String)) => accumulator1 ++ Set(accumulator2._2)
>>>
>>> def mergeCombiner: (Set[String], Set[String]) => Set[String] =
>>> (accumulator1: Set[String], accumulator2: Set[String]) => accumulator1 ++
>>> accumulator2
>>>
>>> sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.id, (x.id,
>>> x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner)
>>>
>>> *Compile Error:-*
>>>  found   : (String, String) => scala.collection.immutable.Set[String]
>>>  required: ((String, String)) => ?
>>> sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.id, (x.id,
>>> x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner)
>>>
>>> Regards,
>>> Rajesh
>>>
>>>
>>
>> --
>> Thanks,
>> Jason
>>
>

-- 
Thanks,
Jason


Re: Checking if cascading graph computation is possible in Spark

2019-04-05 Thread Jason Nerothin
Have you looked at Arbitrary Stateful Streaming and Broadcast Accumulators?

On Fri, Apr 5, 2019 at 10:55 AM Basavaraj  wrote:

> Hi
>
> Have two questions
>
> #1
> I am trying to process events in realtime, outcome of the processing has
> to find a node in the GraphX and update that node as well (in case if any
> anomaly or state change), If a node is updated, I have to update the
> related nodes as well, want to know if GraphX can help in this by providing
> some native support
>
> #2
> I want to do the above as a event driven way, without using the batches (i
> tried micro batches, but I realised that’s not what I want), i.e., for each
> arriving event or as soon as a event message come my stream, not by
> accumulating the event
>
> I humbly welcome any pointers, constructive criticism
>
> Regards
> Basav
> - To
> unsubscribe e-mail: user-unsubscr...@spark.apache.org



-- 
Thanks,
Jason


Re: combineByKey

2019-04-05 Thread Jason Nerothin
I broke some of your code down into the following lines:

import spark.implicits._

val a: RDD[Messages]= sc.parallelize(messages)
val b: Dataset[Messages] = a.toDF.as[Messages]
val c: Dataset[(String, (String, String))] = b.map{x => (x.timeStamp +
"-" + x.Id, (x.Id, x.value))}

You didn't capitalize .Id and your mergeValue0 and mergeCombiner don't have
the types you think for the reduceByKey.

I recommend breaking the code down like this to statement-by-statement when
you get into a dance with the Scala type system.

The type-safety that you're after (that eventually makes life *easier*) is
best supported by Dataset (would have prevented the .id vs .Id error).
Although there are some performance tradeoffs vs RDD and DataFrame...






On Fri, Apr 5, 2019 at 2:11 AM Madabhattula Rajesh Kumar <
mrajaf...@gmail.com> wrote:

> Hi,
>
> Any issue in the below code.
>
> case class Messages(timeStamp: Int, Id: String, value: String)
>
> val messages = Array(
>   Messages(0, "d1", "t1"),
>   Messages(0, "d1", "t1"),
>   Messages(0, "d1", "t1"),
>   Messages(0, "d1", "t1"),
>   Messages(0, "d1", "t2"),
>   Messages(0, "d1", "t2"),
>   Messages(0, "d1", "t3"),
>   Messages(0, "d1", "t4"),
>   Messages(0, "d2", "t1"),
>   Messages(0, "d2", "t1"),
>   Messages(0, "d2", "t5"),
>   Messages(0, "d2", "t6"),
>   Messages(0, "d2", "t2"),
>   Messages(0, "d2", "t2"),
>   Messages(0, "d3", "t1"),
>   Messages(0, "d3", "t1"),
>   Messages(0, "d3", "t2")
> )
>
> //Defining createCombiner, mergeValue and mergeCombiner functions
> def createCombiner = (id: String, value: String) => Set(value)
>
> def mergeValue0 = (accumulator1: Set[String], accumulator2: (String,
> String)) => accumulator1 ++ Set(accumulator2._2)
>
> def mergeCombiner: (Set[String], Set[String]) => Set[String] =
> (accumulator1: Set[String], accumulator2: Set[String]) => accumulator1 ++
> accumulator2
>
> sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.id, (x.id,
> x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner)
>
> *Compile Error:-*
>  found   : (String, String) => scala.collection.immutable.Set[String]
>  required: ((String, String)) => ?
> sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.id, (x.id,
> x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner)
>
> Regards,
> Rajesh
>
>

-- 
Thanks,
Jason


Re: reporting use case

2019-04-04 Thread Jason Nerothin
Hi Prasad,

Could you create an Oracle-side view that captures only the relevant
records and the use Spark JDBC connector to load the view into Spark?

On Thu, Apr 4, 2019 at 1:48 PM Prasad Bhalerao 
wrote:

> Hi,
>
> I am exploring spark for my Reporting application.
> My use case is as follows...
> I have 4-5 oracle tables which contains more than 1.5 billion rows. These
> tables are updated very frequently every day. I don't have choice to change
> database technology. So this data is going to remain in Oracle only.
> To generate 1 report, on an average 15 - 50 million rows has to be fetched
> from oracle tables. These rows contains some blob columns. Most of the time
> is spent in fetching these many rows from db over the network. Data
> processing is not that complex. Currently these report takes around 3-8
> hours to complete. I trying to speed up this report generation process.
>
> Can use spark as a caching layer in this case to avoid fetching data from
> oracle over the network every time? I am thinking to submit a spark job for
> each report request and use spark SQL to fetch the data and then process it
> and write to a file? I trying to use kind of data locality in this case.
>
> Whenever a data is updated in oracle tables can I refresh the data in
> spark storage? I can get the update feed using messaging technology.
>
> Can some one from community help me with this?
> Suggestions are welcome.
>
>
> Thanks,
> Prasad
>
>
>
> Thanks,
> Prasad
>


-- 
Thanks,
Jason


Re: dropDuplicate on timestamp based column unexpected output

2019-04-04 Thread Jason Nerothin
My thinking is that if you run everything in one partition - say 12 GB -
then you don't experience the partitioning problem - one partition will
have all duplicates.

If that's not the case, there are other options, but would probably require
a design change.

On Thu, Apr 4, 2019 at 8:46 AM Jason Nerothin 
wrote:

> How much memory do you have per partition?
>
> On Thu, Apr 4, 2019 at 7:49 AM Chetan Khatri 
> wrote:
>
>> I will get the information and will share with you.
>>
>> On Thu, Apr 4, 2019 at 5:03 PM Abdeali Kothari 
>> wrote:
>>
>>> How long does it take to do the window solution ? (Also mention how many
>>> executors was your spark application using on average during that time)
>>> I am not aware of anything that is faster. When I ran is on my data
>>> ~8-9GB I think it took less than 5 mins (don't remember exact time)
>>>
>>> On Thu, Apr 4, 2019 at 1:09 PM Chetan Khatri <
>>> chetan.opensou...@gmail.com> wrote:
>>>
>>>> Thanks for awesome clarification / explanation.
>>>>
>>>> I have cases where update_time can be same.
>>>> I am in need of suggestions, where I have very large data like 5 GB,
>>>> this window based solution which I mentioned is taking very long time.
>>>>
>>>> Thanks again.
>>>>
>>>> On Thu, Apr 4, 2019 at 12:11 PM Abdeali Kothari <
>>>> abdealikoth...@gmail.com> wrote:
>>>>
>>>>> So, the above code for min() worked for me fine in general, but there
>>>>> was one corner case where it failed.
>>>>> Which was when I have something like:
>>>>> invoice_id=1, update_time=*2018-01-01 15:00:00.000*
>>>>> invoice_id=1, update_time=*2018-01-01 15:00:00.000*
>>>>> invoice_id=1, update_time=2018-02-03 14:00:00.000
>>>>>
>>>>> In this example, the update_time for 2 records is the exact same. So,
>>>>> doing a filter for the min() will result in 2 records for the 
>>>>> invoice_id=1.
>>>>> This is avoided in your code snippet of row_num - because 2 rows will
>>>>> never have row_num = 1
>>>>>
>>>>> But note that here - row_num=1 and row_num=2 will be randomly ordered
>>>>> (because orderBy is on update_time and they have the same value of
>>>>> update_time).
>>>>> Hence dropDuplicates can be used there cause it can be either one of
>>>>> those rows.
>>>>>
>>>>> Overall - dropDuplicates seems like it's meant for cases where you
>>>>> literally have redundant duplicated data. And not for filtering to get
>>>>> first/last etc.
>>>>>
>>>>>
>>>>> On Thu, Apr 4, 2019 at 11:46 AM Chetan Khatri <
>>>>> chetan.opensou...@gmail.com> wrote:
>>>>>
>>>>>> Hello Abdeali, Thank you for your response.
>>>>>>
>>>>>> Can you please explain me this line, And the dropDuplicates at the
>>>>>> end ensures records with two values for the same 'update_time' don't 
>>>>>> cause
>>>>>> issues.
>>>>>>
>>>>>> Sorry I didn't get quickly. :)
>>>>>>
>>>>>> On Thu, Apr 4, 2019 at 10:41 AM Abdeali Kothari <
>>>>>> abdealikoth...@gmail.com> wrote:
>>>>>>
>>>>>>> I've faced this issue too - and a colleague pointed me to the
>>>>>>> documentation -
>>>>>>> https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.dropDuplicates
>>>>>>> dropDuplicates docs does not say that it will guarantee that it will
>>>>>>> return the "first" record (even if you sort your dataframe)
>>>>>>> It would give you any record it finds and just ensure that
>>>>>>> duplicates are not present.
>>>>>>>
>>>>>>> The only way I know of how to do this is what you did, but you can
>>>>>>> avoid the sorting inside the partition with something like (in pyspark):
>>>>>>>
>>>>>>> from pyspark.sql import Window, functions as F
>>>>>>> df = df.withColumn('wanted_time',
>>>>>>> F.min('update_time').over(Window.partitionBy('invoice_id')))
>>>>>>> out_df = df.filter(df['update_time'] == df['wanted_time'])
>>>>>>> .drop('wanted_time').dropDuplicates('invoice_id', 'update_time')
>>>>>>>
>>>>>>> The min() is faster than doing an orderBy() and a row_number().
>>>>>>> And the dropDuplicates at the end ensures records with two values
>>>>>>> for the same 'update_time' don't cause issues.
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Apr 4, 2019 at 10:22 AM Chetan Khatri <
>>>>>>> chetan.opensou...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hello Dear Spark Users,
>>>>>>>>
>>>>>>>> I am using dropDuplicate on a DataFrame generated from large
>>>>>>>> parquet file from(HDFS) and doing dropDuplicate based on timestamp 
>>>>>>>> based
>>>>>>>> column, every time I run it drops different - different rows based on 
>>>>>>>> same
>>>>>>>> timestamp.
>>>>>>>>
>>>>>>>> What I tried and worked
>>>>>>>>
>>>>>>>> val wSpec = Window.partitionBy($"invoice_
>>>>>>>> id").orderBy($"update_time".desc)
>>>>>>>>
>>>>>>>> val irqDistinctDF = irqFilteredDF.withColumn("rn",
>>>>>>>> row_number.over(wSpec)).where($"rn" === 1)
>>>>>>>> .drop("rn").drop("update_time")
>>>>>>>>
>>>>>>>> But this is damn slow...
>>>>>>>>
>>>>>>>> Can someone please throw a light.
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>>
>>>>>>>>
>
> --
> Thanks,
> Jason
>


-- 
Thanks,
Jason


Re: Question about relationship between number of files and initial tasks(partitions)

2019-04-04 Thread Jason Nerothin
Have you tried something like this?

spark.conf.set("spark.sql.shuffle.partitions", "5" )



On Wed, Apr 3, 2019 at 8:37 PM Arthur Li  wrote:

> Hi Sparkers,
>
> I noticed that in my spark application, the number of tasks in the first
> stage is equal to the number of files read by the application(at least for
> Avro) if the number of cpu cores is less than the number of files. Though
> If cpu cores are more than number of files, it's usually equal to default
> parallelism number. Why is it behave like this? Would this require a lot of
> resource from the driver? Is there any way we can do to decrease the number
> of tasks(partitions) in the first stage without merge files before loading?
>
> Thanks,
> Arthur
>
>
> IMPORTANT NOTICE:  This message, including any attachments (hereinafter
> collectively referred to as "Communication"), is intended only for the 
> addressee(s)
> named above.  This Communication may include information that is
> privileged, confidential and exempt from disclosure under applicable law.
> If the recipient of this Communication is not the intended recipient, or
> the employee or agent responsible for delivering this Communication to the
> intended recipient, you are notified that any dissemination, distribution
> or copying of this Communication is strictly prohibited.  If you have
> received this Communication in error, please notify the sender immediately
> by phone or email and permanently delete this Communication from your
> computer without making a copy. Thank you.



-- 
Thanks,
Jason


Re: dropDuplicate on timestamp based column unexpected output

2019-04-04 Thread Jason Nerothin
How much memory do you have per partition?

On Thu, Apr 4, 2019 at 7:49 AM Chetan Khatri 
wrote:

> I will get the information and will share with you.
>
> On Thu, Apr 4, 2019 at 5:03 PM Abdeali Kothari 
> wrote:
>
>> How long does it take to do the window solution ? (Also mention how many
>> executors was your spark application using on average during that time)
>> I am not aware of anything that is faster. When I ran is on my data
>> ~8-9GB I think it took less than 5 mins (don't remember exact time)
>>
>> On Thu, Apr 4, 2019 at 1:09 PM Chetan Khatri 
>> wrote:
>>
>>> Thanks for awesome clarification / explanation.
>>>
>>> I have cases where update_time can be same.
>>> I am in need of suggestions, where I have very large data like 5 GB,
>>> this window based solution which I mentioned is taking very long time.
>>>
>>> Thanks again.
>>>
>>> On Thu, Apr 4, 2019 at 12:11 PM Abdeali Kothari <
>>> abdealikoth...@gmail.com> wrote:
>>>
>>>> So, the above code for min() worked for me fine in general, but there
>>>> was one corner case where it failed.
>>>> Which was when I have something like:
>>>> invoice_id=1, update_time=*2018-01-01 15:00:00.000*
>>>> invoice_id=1, update_time=*2018-01-01 15:00:00.000*
>>>> invoice_id=1, update_time=2018-02-03 14:00:00.000
>>>>
>>>> In this example, the update_time for 2 records is the exact same. So,
>>>> doing a filter for the min() will result in 2 records for the invoice_id=1.
>>>> This is avoided in your code snippet of row_num - because 2 rows will
>>>> never have row_num = 1
>>>>
>>>> But note that here - row_num=1 and row_num=2 will be randomly ordered
>>>> (because orderBy is on update_time and they have the same value of
>>>> update_time).
>>>> Hence dropDuplicates can be used there cause it can be either one of
>>>> those rows.
>>>>
>>>> Overall - dropDuplicates seems like it's meant for cases where you
>>>> literally have redundant duplicated data. And not for filtering to get
>>>> first/last etc.
>>>>
>>>>
>>>> On Thu, Apr 4, 2019 at 11:46 AM Chetan Khatri <
>>>> chetan.opensou...@gmail.com> wrote:
>>>>
>>>>> Hello Abdeali, Thank you for your response.
>>>>>
>>>>> Can you please explain me this line, And the dropDuplicates at the end
>>>>> ensures records with two values for the same 'update_time' don't cause
>>>>> issues.
>>>>>
>>>>> Sorry I didn't get quickly. :)
>>>>>
>>>>> On Thu, Apr 4, 2019 at 10:41 AM Abdeali Kothari <
>>>>> abdealikoth...@gmail.com> wrote:
>>>>>
>>>>>> I've faced this issue too - and a colleague pointed me to the
>>>>>> documentation -
>>>>>> https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.dropDuplicates
>>>>>> dropDuplicates docs does not say that it will guarantee that it will
>>>>>> return the "first" record (even if you sort your dataframe)
>>>>>> It would give you any record it finds and just ensure that duplicates
>>>>>> are not present.
>>>>>>
>>>>>> The only way I know of how to do this is what you did, but you can
>>>>>> avoid the sorting inside the partition with something like (in pyspark):
>>>>>>
>>>>>> from pyspark.sql import Window, functions as F
>>>>>> df = df.withColumn('wanted_time',
>>>>>> F.min('update_time').over(Window.partitionBy('invoice_id')))
>>>>>> out_df = df.filter(df['update_time'] == df['wanted_time'])
>>>>>> .drop('wanted_time').dropDuplicates('invoice_id', 'update_time')
>>>>>>
>>>>>> The min() is faster than doing an orderBy() and a row_number().
>>>>>> And the dropDuplicates at the end ensures records with two values for
>>>>>> the same 'update_time' don't cause issues.
>>>>>>
>>>>>>
>>>>>> On Thu, Apr 4, 2019 at 10:22 AM Chetan Khatri <
>>>>>> chetan.opensou...@gmail.com> wrote:
>>>>>>
>>>>>>> Hello Dear Spark Users,
>>>>>>>
>>>>>>> I am using dropDuplicate on a DataFrame generated from large parquet
>>>>>>> file from(HDFS) and doing dropDuplicate based on timestamp based column,
>>>>>>> every time I run it drops different - different rows based on same
>>>>>>> timestamp.
>>>>>>>
>>>>>>> What I tried and worked
>>>>>>>
>>>>>>> val wSpec = Window.partitionBy($"invoice_
>>>>>>> id").orderBy($"update_time".desc)
>>>>>>>
>>>>>>> val irqDistinctDF = irqFilteredDF.withColumn("rn",
>>>>>>> row_number.over(wSpec)).where($"rn" === 1)
>>>>>>> .drop("rn").drop("update_time")
>>>>>>>
>>>>>>> But this is damn slow...
>>>>>>>
>>>>>>> Can someone please throw a light.
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>>

-- 
Thanks,
Jason


Re: Upcoming talks on BigDL and Analytics Zoo this week

2019-04-03 Thread Jason Dai
The slides of the two technical talks for BigDL and Analytics Zoo (
https://github.com/intel-analytics/analytics-zoo/) at Strata Data
Conference have been uploaded to
https://analytics-zoo.github.io/master/#presentations/.

Thanks,
-Jason

On Sun, Mar 24, 2019 at 9:03 PM Jason Dai  wrote:

> Hi all,
>
>
>
> Please see below for a list of upcoming technical sessions
> on BigDL and Analytics Zoo (
> https://github.com/intel-analytics/analytics-zoo/) this week:
>
>
>- Engineers from Intel will deliver a 3-hour tutorial Analytics Zoo:
>Distributed TensorFlow and Keras on Apache Spark
>
> <https://conferences.oreilly.com/strata/strata-ca/public/schedule/detail/72802>
> at *Strata Data Conference in San Francisco* (March 26, 1:30–5:00pm)
>- Engineers from Office Depot and Intel will present a technical talk 
> User-based
>real-time product recommendations leveraging deep learning using Analytics
>Zoo on Apache Spark and BigDL
>
> <https://conferences.oreilly.com/strata/strata-ca/public/schedule/detail/73079>
> at *Strata Data Conference in San Francisco* (March 27, 4:20–5:00pm)
>- Engineers from Intel will host a poster session Analytics Zoo:
>Unified Analytics + AI Platform for Big Data
><http://scaledml.org/2019/> at *ScaledML 2019 in Mountain View* (March
>27, 5:00–7:00PM)
>- Engineers from Intel will present a technical talk Analytics Zoo:
>Distributed TensorFlow in production on Apache Spark
>
> <https://conferences.oreilly.com/strata/strata-ca/public/schedule/detail/72802>
> at *Strata Data Conference in San Francisco* (March 28, 3:50–4:30pm)
>
> If you plan to attend these events, please drop by and talk to the
> speakers :-)
>
>
>
> Thanks,
>
> -Jason
>


Re: How to extract data in parallel from RDBMS tables

2019-04-02 Thread Jason Nerothin
I can *imagine* writing some sort of DataframeReader-generation tool, but
am not aware of one that currently exists.

On Tue, Apr 2, 2019 at 13:08 Surendra , Manchikanti <
surendra.manchika...@gmail.com> wrote:

>
> Looking for a generic solution, not for a specific DB or number of tables.
>
>
> On Fri, Mar 29, 2019 at 5:04 AM Jason Nerothin 
> wrote:
>
>> How many tables? What DB?
>>
>> On Fri, Mar 29, 2019 at 00:50 Surendra , Manchikanti <
>> surendra.manchika...@gmail.com> wrote:
>>
>>> Hi Jason,
>>>
>>> Thanks for your reply, But I am looking for a way to parallelly extract
>>> all the tables in a Database.
>>>
>>>
>>> On Thu, Mar 28, 2019 at 2:50 PM Jason Nerothin 
>>> wrote:
>>>
>>>> Yes.
>>>>
>>>> If you use the numPartitions option, your max parallelism will be that
>>>> number. See also: partitionColumn, lowerBound, and upperBound
>>>>
>>>> https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html
>>>>
>>>> On Wed, Mar 27, 2019 at 23:06 Surendra , Manchikanti <
>>>> surendra.manchika...@gmail.com> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> Is there any way to copy all the tables in parallel from RDBMS using
>>>>> Spark? We are looking for a functionality similar to Sqoop.
>>>>>
>>>>> Thanks,
>>>>> Surendra
>>>>>
>>>>> --
>>>> Thanks,
>>>> Jason
>>>>
>>> --
>> Thanks,
>> Jason
>>
> --
Thanks,
Jason


Re: Spark SQL API taking longer time than DF API.

2019-03-30 Thread Jason Nerothin
Can you please quantify the difference and provide the query code?

On Fri, Mar 29, 2019 at 9:11 AM neeraj bhadani 
wrote:

> Hi Team,
>I am executing same spark code using the Spark SQL API and DataFrame
> API, however, Spark SQL is taking longer than expected.
>
> PFB Sudo code.
>
> ---
>
> Case 1 : Spark SQL
>
>
> ---
>
> %sql
>
> CREATE TABLE 
>
> AS
>
>
>  WITH  AS (
>
>  
>
> )
>
> , AS (
>
>  
>
>  )
>
>
> SELECT * FROM 
>
> UNION ALL
>
> SELECT * FROM 
>
>
>
> ---
>
> Case  2 : DataFrame API
>
>
> ---
>
>
> df1 = spark.sql()
>
> df2 = spark.sql()
>
> df3 = df1.union(df2)
>
> df3.write.saveAsTable()
>
>
> ---
>
>
> As per my understanding, both Spark SQL and DtaaFrame API generate the
> same code under the hood and execution time has to be similar.
>
>
> Regards,
>
> Neeraj
>
>
>

-- 
Thanks,
Jason


Re: How to extract data in parallel from RDBMS tables

2019-03-29 Thread Jason Nerothin
How many tables? What DB?

On Fri, Mar 29, 2019 at 00:50 Surendra , Manchikanti <
surendra.manchika...@gmail.com> wrote:

> Hi Jason,
>
> Thanks for your reply, But I am looking for a way to parallelly extract
> all the tables in a Database.
>
>
> On Thu, Mar 28, 2019 at 2:50 PM Jason Nerothin 
> wrote:
>
>> Yes.
>>
>> If you use the numPartitions option, your max parallelism will be that
>> number. See also: partitionColumn, lowerBound, and upperBound
>>
>> https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html
>>
>> On Wed, Mar 27, 2019 at 23:06 Surendra , Manchikanti <
>> surendra.manchika...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> Is there any way to copy all the tables in parallel from RDBMS using
>>> Spark? We are looking for a functionality similar to Sqoop.
>>>
>>> Thanks,
>>> Surendra
>>>
>>> --
>> Thanks,
>> Jason
>>
> --
Thanks,
Jason


Re: spark.submit.deployMode: cluster

2019-03-28 Thread Jason Nerothin
Meant this one: https://docs.databricks.com/api/latest/jobs.html

On Thu, Mar 28, 2019 at 5:06 PM Pat Ferrel  wrote:

> Thanks, are you referring to
> https://github.com/spark-jobserver/spark-jobserver or the undocumented
> REST job server included in Spark?
>
>
> From: Jason Nerothin  
> Reply: Jason Nerothin  
> Date: March 28, 2019 at 2:53:05 PM
> To: Pat Ferrel  
> Cc: Felix Cheung  , 
> Marcelo
> Vanzin  , user
>  
> Subject:  Re: spark.submit.deployMode: cluster
>
> Check out the Spark Jobs API... it sits behind a REST service...
>
>
> On Thu, Mar 28, 2019 at 12:29 Pat Ferrel  wrote:
>
>> ;-)
>>
>> Great idea. Can you suggest a project?
>>
>> Apache PredictionIO uses spark-submit (very ugly) and Apache Mahout only
>> launches trivially in test apps since most uses are as a lib.
>>
>>
>> From: Felix Cheung 
>> 
>> Reply: Felix Cheung 
>> 
>> Date: March 28, 2019 at 9:42:31 AM
>> To: Pat Ferrel  , Marcelo
>> Vanzin  
>> Cc: user  
>> Subject:  Re: spark.submit.deployMode: cluster
>>
>> If anyone wants to improve docs please create a PR.
>>
>> lol
>>
>>
>> But seriously you might want to explore other projects that manage job
>> submission on top of spark instead of rolling your own with spark-submit.
>>
>>
>> --
>> *From:* Pat Ferrel 
>> *Sent:* Tuesday, March 26, 2019 2:38 PM
>> *To:* Marcelo Vanzin
>> *Cc:* user
>> *Subject:* Re: spark.submit.deployMode: cluster
>>
>> Ahh, thank you indeed!
>>
>> It would have saved us a lot of time if this had been documented. I know,
>> OSS so contributions are welcome… I can also imagine your next comment; “If
>> anyone wants to improve docs see the Apache contribution rules and create a
>> PR.” or something like that.
>>
>> BTW the code where the context is known and can be used is what I’d call
>> a Driver and since all code is copied to nodes and is know in jars, it was
>> not obvious to us that this rule existed but it does make sense.
>>
>> We will need to refactor our code to use spark-submit it appears.
>>
>> Thanks again.
>>
>>
>> From: Marcelo Vanzin  
>> Reply: Marcelo Vanzin  
>> Date: March 26, 2019 at 1:59:36 PM
>> To: Pat Ferrel  
>> Cc: user  
>> Subject:  Re: spark.submit.deployMode: cluster
>>
>> If you're not using spark-submit, then that option does nothing.
>>
>> If by "context creation API" you mean "new SparkContext()" or an
>> equivalent, then you're explicitly creating the driver inside your
>> application.
>>
>> On Tue, Mar 26, 2019 at 1:56 PM Pat Ferrel  wrote:
>> >
>> > I have a server that starts a Spark job using the context creation API.
>> It DOES NOY use spark-submit.
>> >
>> > I set spark.submit.deployMode = “cluster”
>> >
>> > In the GUI I see 2 workers with 2 executors. The link for running
>> application “name” goes back to my server, the machine that launched the
>> job.
>> >
>> > This is spark.submit.deployMode = “client” according to the docs. I set
>> the Driver to run on the cluster but it runs on the client, ignoring the
>> spark.submit.deployMode.
>> >
>> > Is this as expected? It is documented nowhere I can find.
>> >
>>
>>
>> --
>> Marcelo
>>
>> --
> Thanks,
> Jason
>
>

-- 
Thanks,
Jason


Re: spark.submit.deployMode: cluster

2019-03-28 Thread Jason Nerothin
Check out the Spark Jobs API... it sits behind a REST service...


On Thu, Mar 28, 2019 at 12:29 Pat Ferrel  wrote:

> ;-)
>
> Great idea. Can you suggest a project?
>
> Apache PredictionIO uses spark-submit (very ugly) and Apache Mahout only
> launches trivially in test apps since most uses are as a lib.
>
>
> From: Felix Cheung  
> Reply: Felix Cheung 
> 
> Date: March 28, 2019 at 9:42:31 AM
> To: Pat Ferrel  , Marcelo
> Vanzin  
> Cc: user  
> Subject:  Re: spark.submit.deployMode: cluster
>
> If anyone wants to improve docs please create a PR.
>
> lol
>
>
> But seriously you might want to explore other projects that manage job
> submission on top of spark instead of rolling your own with spark-submit.
>
>
> --
> *From:* Pat Ferrel 
> *Sent:* Tuesday, March 26, 2019 2:38 PM
> *To:* Marcelo Vanzin
> *Cc:* user
> *Subject:* Re: spark.submit.deployMode: cluster
>
> Ahh, thank you indeed!
>
> It would have saved us a lot of time if this had been documented. I know,
> OSS so contributions are welcome… I can also imagine your next comment; “If
> anyone wants to improve docs see the Apache contribution rules and create a
> PR.” or something like that.
>
> BTW the code where the context is known and can be used is what I’d call a
> Driver and since all code is copied to nodes and is know in jars, it was
> not obvious to us that this rule existed but it does make sense.
>
> We will need to refactor our code to use spark-submit it appears.
>
> Thanks again.
>
>
> From: Marcelo Vanzin  
> Reply: Marcelo Vanzin  
> Date: March 26, 2019 at 1:59:36 PM
> To: Pat Ferrel  
> Cc: user  
> Subject:  Re: spark.submit.deployMode: cluster
>
> If you're not using spark-submit, then that option does nothing.
>
> If by "context creation API" you mean "new SparkContext()" or an
> equivalent, then you're explicitly creating the driver inside your
> application.
>
> On Tue, Mar 26, 2019 at 1:56 PM Pat Ferrel  wrote:
> >
> > I have a server that starts a Spark job using the context creation API.
> It DOES NOY use spark-submit.
> >
> > I set spark.submit.deployMode = “cluster”
> >
> > In the GUI I see 2 workers with 2 executors. The link for running
> application “name” goes back to my server, the machine that launched the
> job.
> >
> > This is spark.submit.deployMode = “client” according to the docs. I set
> the Driver to run on the cluster but it runs on the client, ignoring the
> spark.submit.deployMode.
> >
> > Is this as expected? It is documented nowhere I can find.
> >
>
>
> --
> Marcelo
>
> --
Thanks,
Jason


Re: How to extract data in parallel from RDBMS tables

2019-03-28 Thread Jason Nerothin
Yes.

If you use the numPartitions option, your max parallelism will be that
number. See also: partitionColumn, lowerBound, and upperBound

https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html

On Wed, Mar 27, 2019 at 23:06 Surendra , Manchikanti <
surendra.manchika...@gmail.com> wrote:

> Hi All,
>
> Is there any way to copy all the tables in parallel from RDBMS using
> Spark? We are looking for a functionality similar to Sqoop.
>
> Thanks,
> Surendra
>
> --
Thanks,
Jason


streaming - absolute maximum

2019-03-25 Thread Jason Nerothin
Hello,

I wish to calculate the most recent event time from a Stream.

Something like this:

val timestamped = records.withColumn("ts_long",
unix_timestamp($"eventTime"))
val lastReport = timestamped
  .withWatermark("eventTime", "4 hours")
  .groupBy(col("eventTime"),
window(col("eventTime"), "10 minutes", "5 minutes"))
  .max("ts_long")
  .writeStream
  .foreach(new LastReportUpdater(stationId))
  .start()

During normal execution, I expect to receive a few events per minute, at
most.

So now for the problem: During system initiation, I batch load a longer
history of data (stretching back months). Because the volume is higher
during initiation, records arrive with lots of time skew.

I'm saving the result off to a database and want to update it in realtime
during streaming operation.

Do I write to flavors of the query - one as a static Dataset for initiation
and another for realtime? Is my logic incorrect?

Thanks,
Jason
-- 
Thanks,
Jason


Upcoming talks on BigDL and Analytics Zoo this week

2019-03-24 Thread Jason Dai
Hi all,



Please see below for a list of upcoming technical sessions
on BigDL and Analytics Zoo (
https://github.com/intel-analytics/analytics-zoo/) this week:


   - Engineers from Intel will deliver a 3-hour tutorial Analytics Zoo:
   Distributed TensorFlow and Keras on Apache Spark
   
<https://conferences.oreilly.com/strata/strata-ca/public/schedule/detail/72802>
at *Strata Data Conference in San Francisco* (March 26, 1:30–5:00pm)
   - Engineers from Office Depot and Intel will present a technical
talk User-based
   real-time product recommendations leveraging deep learning using Analytics
   Zoo on Apache Spark and BigDL
   
<https://conferences.oreilly.com/strata/strata-ca/public/schedule/detail/73079>
at *Strata Data Conference in San Francisco* (March 27, 4:20–5:00pm)
   - Engineers from Intel will host a poster session Analytics Zoo: Unified
   Analytics + AI Platform for Big Data <http://scaledml.org/2019/> at
*ScaledML
   2019 in Mountain View* (March 27, 5:00–7:00PM)
   - Engineers from Intel will present a technical talk Analytics Zoo:
   Distributed TensorFlow in production on Apache Spark
   
<https://conferences.oreilly.com/strata/strata-ca/public/schedule/detail/72802>
at *Strata Data Conference in San Francisco* (March 28, 3:50–4:30pm)

If you plan to attend these events, please drop by and talk to the speakers
:-)



Thanks,

-Jason


Re: Spark LOCAL mode and external jar (extraClassPath)

2018-04-14 Thread Jason Boorn
Ok great I’ll give that a shot -

Thanks for all the help

> On Apr 14, 2018, at 12:08 PM, Gene Pang <gene.p...@gmail.com> wrote:
> 
> Yes, I think that is the case. I haven't tried that before, but it should 
> work.
> 
> Thanks,
> Gene
> 
> On Fri, Apr 13, 2018 at 11:32 AM, Jason Boorn <jbo...@gmail.com 
> <mailto:jbo...@gmail.com>> wrote:
> Hi Gene - 
> 
> Are you saying that I just need to figure out how to get the Alluxio jar into 
> the classpath of my parent application?  If it shows up in the classpath then 
> Spark will automatically know that it needs to use it when communicating with 
> Alluxio?
> 
> Apologies for going back-and-forth on this - I feel like my particular use 
> case is clouding what is already a tricky issue.
> 
>> On Apr 13, 2018, at 2:26 PM, Gene Pang <gene.p...@gmail.com 
>> <mailto:gene.p...@gmail.com>> wrote:
>> 
>> Hi Jason,
>> 
>> Alluxio does work with Spark in master=local mode. This is because both 
>> spark-submit and spark-shell have command-line options to set the classpath 
>> for the JVM that is being started.
>> 
>> If you are not using spark-submit or spark-shell, you will have to figure 
>> out how to configure that JVM instance with the proper properties.
>> 
>> Thanks,
>> Gene
>> 
>> On Fri, Apr 13, 2018 at 10:47 AM, Jason Boorn <jbo...@gmail.com 
>> <mailto:jbo...@gmail.com>> wrote:
>> Ok thanks - I was basing my design on this:
>> 
>> https://databricks.com/blog/2016/08/15/how-to-use-sparksession-in-apache-spark-2-0.html
>>  
>> <https://databricks.com/blog/2016/08/15/how-to-use-sparksession-in-apache-spark-2-0.html>
>> 
>> Wherein it says:
>> Once the SparkSession is instantiated, you can configure Spark’s runtime 
>> config properties. 
>> Apparently the suite of runtime configs you can change does not include 
>> classpath.  
>> 
>> So the answer to my original question is basically this:
>> 
>> When using local (pseudo-cluster) mode, there is no way to add external jars 
>> to the spark instance.  This means that Alluxio will not work with Spark 
>> when Spark is run in master=local mode.
>> 
>> Thanks again - often getting a definitive “no” is almost as good as a yes.  
>> Almost ;)
>> 
>>> On Apr 13, 2018, at 1:21 PM, Marcelo Vanzin <van...@cloudera.com 
>>> <mailto:van...@cloudera.com>> wrote:
>>> 
>>> There are two things you're doing wrong here:
>>> 
>>> On Thu, Apr 12, 2018 at 6:32 PM, jb44 <jbo...@gmail.com 
>>> <mailto:jbo...@gmail.com>> wrote:
>>>> Then I can add the alluxio client library like so:
>>>> sparkSession.conf.set("spark.driver.extraClassPath", ALLUXIO_SPARK_CLIENT)
>>> 
>>> First one, you can't modify JVM configuration after it has already
>>> started. So this line does nothing since it can't re-launch your
>>> application with a new JVM.
>>> 
>>>> sparkSession.conf.set("spark.executor.extraClassPath", 
>>>> ALLUXIO_SPARK_CLIENT)
>>> 
>>> There is a lot of configuration that you cannot set after the
>>> application has already started. For example, after the session is
>>> created, most probably this option will be ignored, since executors
>>> will already have started.
>>> 
>>> I'm not so sure about what happens when you use dynamic allocation,
>>> but these post-hoc config changes in general are not expected to take
>>> effect.
>>> 
>>> The documentation could be clearer about this (especially stuff that
>>> only applies to spark-submit), but that's the gist of it.
>>> 
>>> 
>>> -- 
>>> Marcelo
>> 
>> 
> 
> 



Re: Spark LOCAL mode and external jar (extraClassPath)

2018-04-13 Thread Jason Boorn
Hi Gene - 

Are you saying that I just need to figure out how to get the Alluxio jar into 
the classpath of my parent application?  If it shows up in the classpath then 
Spark will automatically know that it needs to use it when communicating with 
Alluxio?

Apologies for going back-and-forth on this - I feel like my particular use case 
is clouding what is already a tricky issue.

> On Apr 13, 2018, at 2:26 PM, Gene Pang <gene.p...@gmail.com> wrote:
> 
> Hi Jason,
> 
> Alluxio does work with Spark in master=local mode. This is because both 
> spark-submit and spark-shell have command-line options to set the classpath 
> for the JVM that is being started.
> 
> If you are not using spark-submit or spark-shell, you will have to figure out 
> how to configure that JVM instance with the proper properties.
> 
> Thanks,
> Gene
> 
> On Fri, Apr 13, 2018 at 10:47 AM, Jason Boorn <jbo...@gmail.com 
> <mailto:jbo...@gmail.com>> wrote:
> Ok thanks - I was basing my design on this:
> 
> https://databricks.com/blog/2016/08/15/how-to-use-sparksession-in-apache-spark-2-0.html
>  
> <https://databricks.com/blog/2016/08/15/how-to-use-sparksession-in-apache-spark-2-0.html>
> 
> Wherein it says:
> Once the SparkSession is instantiated, you can configure Spark’s runtime 
> config properties. 
> Apparently the suite of runtime configs you can change does not include 
> classpath.  
> 
> So the answer to my original question is basically this:
> 
> When using local (pseudo-cluster) mode, there is no way to add external jars 
> to the spark instance.  This means that Alluxio will not work with Spark when 
> Spark is run in master=local mode.
> 
> Thanks again - often getting a definitive “no” is almost as good as a yes.  
> Almost ;)
> 
>> On Apr 13, 2018, at 1:21 PM, Marcelo Vanzin <van...@cloudera.com 
>> <mailto:van...@cloudera.com>> wrote:
>> 
>> There are two things you're doing wrong here:
>> 
>> On Thu, Apr 12, 2018 at 6:32 PM, jb44 <jbo...@gmail.com 
>> <mailto:jbo...@gmail.com>> wrote:
>>> Then I can add the alluxio client library like so:
>>> sparkSession.conf.set("spark.driver.extraClassPath", ALLUXIO_SPARK_CLIENT)
>> 
>> First one, you can't modify JVM configuration after it has already
>> started. So this line does nothing since it can't re-launch your
>> application with a new JVM.
>> 
>>> sparkSession.conf.set("spark.executor.extraClassPath", ALLUXIO_SPARK_CLIENT)
>> 
>> There is a lot of configuration that you cannot set after the
>> application has already started. For example, after the session is
>> created, most probably this option will be ignored, since executors
>> will already have started.
>> 
>> I'm not so sure about what happens when you use dynamic allocation,
>> but these post-hoc config changes in general are not expected to take
>> effect.
>> 
>> The documentation could be clearer about this (especially stuff that
>> only applies to spark-submit), but that's the gist of it.
>> 
>> 
>> -- 
>> Marcelo
> 
> 



Re: Spark LOCAL mode and external jar (extraClassPath)

2018-04-13 Thread Jason Boorn
Ok thanks - I was basing my design on this:

https://databricks.com/blog/2016/08/15/how-to-use-sparksession-in-apache-spark-2-0.html
 


Wherein it says:
Once the SparkSession is instantiated, you can configure Spark’s runtime config 
properties. 
Apparently the suite of runtime configs you can change does not include 
classpath.  

So the answer to my original question is basically this:

When using local (pseudo-cluster) mode, there is no way to add external jars to 
the spark instance.  This means that Alluxio will not work with Spark when 
Spark is run in master=local mode.

Thanks again - often getting a definitive “no” is almost as good as a yes.  
Almost ;)

> On Apr 13, 2018, at 1:21 PM, Marcelo Vanzin  wrote:
> 
> There are two things you're doing wrong here:
> 
> On Thu, Apr 12, 2018 at 6:32 PM, jb44  wrote:
>> Then I can add the alluxio client library like so:
>> sparkSession.conf.set("spark.driver.extraClassPath", ALLUXIO_SPARK_CLIENT)
> 
> First one, you can't modify JVM configuration after it has already
> started. So this line does nothing since it can't re-launch your
> application with a new JVM.
> 
>> sparkSession.conf.set("spark.executor.extraClassPath", ALLUXIO_SPARK_CLIENT)
> 
> There is a lot of configuration that you cannot set after the
> application has already started. For example, after the session is
> created, most probably this option will be ignored, since executors
> will already have started.
> 
> I'm not so sure about what happens when you use dynamic allocation,
> but these post-hoc config changes in general are not expected to take
> effect.
> 
> The documentation could be clearer about this (especially stuff that
> only applies to spark-submit), but that's the gist of it.
> 
> 
> -- 
> Marcelo



Re: Spark LOCAL mode and external jar (extraClassPath)

2018-04-13 Thread Jason Boorn
Thanks - I’ve seen this SO post, it covers spark-submit, which I am not using.

Regarding the ALLUXIO_SPARK_CLIENT variable, it is located on the machine that 
is running the job which spawns the master=local spark.  According to the Spark 
documentation, this should be possible, but it appears it is not.

Once again - I’m trying to solve the use case for master=local, NOT for a 
cluster and NOT with spark-submit.  

> On Apr 13, 2018, at 12:47 PM, yohann jardin <yohannjar...@hotmail.com> wrote:
> 
> Hey Jason,
> Might be related to what is behind your variable ALLUXIO_SPARK_CLIENT and 
> where is located the lib (is it on HDFS, on the node that submits the job, or 
> locally to all spark workers?)
> There is a great post on SO about it: https://stackoverflow.com/a/37348234 
> <https://stackoverflow.com/a/37348234>
> We might as well check that you provide correctly the jar based on its 
> location. I have found it tricky in some cases.
> As a debug try, if the jar is not on HDFS, you can copy it there and then 
> specify the full path in the extraclasspath property. 
> Regards,
> Yohann Jardin
> 
> Le 4/13/2018 à 5:38 PM, Jason Boorn a écrit :
>> I do, and this is what I will fall back to if nobody has a better idea :)
>> 
>> I was just hoping to get this working as it is much more convenient for my 
>> testing pipeline.
>> 
>> Thanks again for the help
>> 
>>> On Apr 13, 2018, at 11:33 AM, Geoff Von Allmen <ge...@ibleducation.com 
>>> <mailto:ge...@ibleducation.com>> wrote:
>>> 
>>> Ok - `LOCAL` makes sense now.
>>> 
>>> Do you have the option to still use `spark-submit` in this scenario, but 
>>> using the following options:
>>> 
>>> ```bash
>>> --master "local[*]" \
>>> --deploy-mode "client" \
>>> ...
>>> ```
>>> 
>>> I know in the past, I have setup some options using `.config("Option", 
>>> "value")` when creating the spark session, and then other runtime options 
>>> as you describe above with `spark.conf.set`. At this point though I've just 
>>> moved everything out into a `spark-submit` script.
>>> 
>>> On Fri, Apr 13, 2018 at 8:18 AM, Jason Boorn <jbo...@gmail.com 
>>> <mailto:jbo...@gmail.com>> wrote:
>>> Hi Geoff -
>>> 
>>> Appreciate the help here - I do understand what you’re saying below.  And I 
>>> am able to get this working when I submit a job to a local cluster.
>>> 
>>> I think part of the issue here is that there’s ambiguity in the 
>>> terminology.  When I say “LOCAL” spark, I mean an instance of spark that is 
>>> created by my driver program, and is not a cluster itself.  It means that 
>>> my master node is “local”, and this mode is primarily used for testing.
>>> 
>>> https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-local.html
>>>  
>>> <https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-local.html>
>>> 
>>> While I am able to get alluxio working with spark-submit, I am unable to 
>>> get it working when using local mode.  The mechanisms for setting class 
>>> paths during spark-submit are not available in local mode.  My 
>>> understanding is that all one is able to use is:
>>> 
>>> spark.conf.set(“”)
>>> 
>>> To set any runtime properties of the local instance.  Note that it is 
>>> possible (and I am more convinced of this as time goes on) that alluxio 
>>> simply does not work in spark local mode as described above.
>>> 
>>> 
>>>> On Apr 13, 2018, at 11:09 AM, Geoff Von Allmen <ge...@ibleducation.com 
>>>> <mailto:ge...@ibleducation.com>> wrote:
>>>> 
>>>> I fought with a 
>>>> ClassNotFoundException for quite some time, but it was for kafka.
>>>> 
>>>> The final configuration that got everything working was running 
>>>> spark-submit with the following options:
>>>> 
>>>> --jars "/path/to/.ivy2/jars/package.jar" \
>>>> --driver-class-path "/path/to/.ivy2/jars/package.jar" \
>>>> --conf "spark.executor.extraClassPath=/path/to/.ivy2/package.jar" \
>>>> --packages org.some.package:package_name:version
>>>> While this was needed for me to run in 
>>>> cluster mode, it works equally well for 
>>>> client mode as well.
>>>> 
>>>> One other note when needing to supplied multiple items to these 

Re: Spark LOCAL mode and external jar (extraClassPath)

2018-04-13 Thread Jason Boorn
I do, and this is what I will fall back to if nobody has a better idea :)

I was just hoping to get this working as it is much more convenient for my 
testing pipeline.

Thanks again for the help

> On Apr 13, 2018, at 11:33 AM, Geoff Von Allmen <ge...@ibleducation.com> wrote:
> 
> Ok - `LOCAL` makes sense now.
> 
> Do you have the option to still use `spark-submit` in this scenario, but 
> using the following options:
> 
> ```bash
> --master "local[*]" \
> --deploy-mode "client" \
> ...
> ```
> 
> I know in the past, I have setup some options using `.config("Option", 
> "value")` when creating the spark session, and then other runtime options as 
> you describe above with `spark.conf.set`. At this point though I've just 
> moved everything out into a `spark-submit` script.
> 
> On Fri, Apr 13, 2018 at 8:18 AM, Jason Boorn <jbo...@gmail.com 
> <mailto:jbo...@gmail.com>> wrote:
> Hi Geoff -
> 
> Appreciate the help here - I do understand what you’re saying below.  And I 
> am able to get this working when I submit a job to a local cluster.
> 
> I think part of the issue here is that there’s ambiguity in the terminology.  
> When I say “LOCAL” spark, I mean an instance of spark that is created by my 
> driver program, and is not a cluster itself.  It means that my master node is 
> “local”, and this mode is primarily used for testing.
> 
> https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-local.html
>  
> <https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-local.html>
> 
> While I am able to get alluxio working with spark-submit, I am unable to get 
> it working when using local mode.  The mechanisms for setting class paths 
> during spark-submit are not available in local mode.  My understanding is 
> that all one is able to use is:
> 
> spark.conf.set(“”)
> 
> To set any runtime properties of the local instance.  Note that it is 
> possible (and I am more convinced of this as time goes on) that alluxio 
> simply does not work in spark local mode as described above.
> 
> 
>> On Apr 13, 2018, at 11:09 AM, Geoff Von Allmen <ge...@ibleducation.com 
>> <mailto:ge...@ibleducation.com>> wrote:
>> 
>> I fought with a ClassNotFoundException for quite some time, but it was for 
>> kafka.
>> 
>> The final configuration that got everything working was running spark-submit 
>> with the following options:
>> 
>> --jars "/path/to/.ivy2/jars/package.jar" \
>> --driver-class-path "/path/to/.ivy2/jars/package.jar" \
>> --conf "spark.executor.extraClassPath=/path/to/.ivy2/package.jar" \
>> --packages org.some.package:package_name:version
>> While this was needed for me to run in cluster mode, it works equally well 
>> for client mode as well.
>> 
>> One other note when needing to supplied multiple items to these args - 
>> --jars and --packages should be comma separated, --driver-class-path and 
>> extraClassPath should be : separated
>> 
>> HTH
>> 
>> 
>> On Fri, Apr 13, 2018 at 4:28 AM, jb44 <jbo...@gmail.com 
>> <mailto:jbo...@gmail.com>> wrote:
>> Haoyuan -
>> 
>> As I mentioned below, I've been through the documentation already.  It has
>> not helped me to resolve the issue.
>> 
>> Here is what I have tried so far:
>> 
>> - setting extraClassPath as explained below
>> - adding fs.alluxio.impl through sparkconf
>> - adding spark.sql.hive.metastore.sharedPrefixes (though I don't believe
>> this matters in my case)
>> - compiling the client from source 
>> 
>> Do you have any other suggestions on how to get this working?  
>> 
>> Thanks
>> 
>> 
>> 
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ 
>> <http://apache-spark-user-list.1001560.n3.nabble.com/>
>> 
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
>> <mailto:user-unsubscr...@spark.apache.org>
>> 
>> 
> 
> 



Re: Spark LOCAL mode and external jar (extraClassPath)

2018-04-13 Thread Jason Boorn
Hi Geoff -

Appreciate the help here - I do understand what you’re saying below.  And I am 
able to get this working when I submit a job to a local cluster.

I think part of the issue here is that there’s ambiguity in the terminology.  
When I say “LOCAL” spark, I mean an instance of spark that is created by my 
driver program, and is not a cluster itself.  It means that my master node is 
“local”, and this mode is primarily used for testing.

https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-local.html
 


While I am able to get alluxio working with spark-submit, I am unable to get it 
working when using local mode.  The mechanisms for setting class paths during 
spark-submit are not available in local mode.  My understanding is that all one 
is able to use is:

spark.conf.set(“”)

To set any runtime properties of the local instance.  Note that it is possible 
(and I am more convinced of this as time goes on) that alluxio simply does not 
work in spark local mode as described above.


> On Apr 13, 2018, at 11:09 AM, Geoff Von Allmen  wrote:
> 
> I fought with a ClassNotFoundException for quite some time, but it was for 
> kafka.
> 
> The final configuration that got everything working was running spark-submit 
> with the following options:
> 
> --jars "/path/to/.ivy2/jars/package.jar" \
> --driver-class-path "/path/to/.ivy2/jars/package.jar" \
> --conf "spark.executor.extraClassPath=/path/to/.ivy2/package.jar" \
> --packages org.some.package:package_name:version
> While this was needed for me to run in cluster mode, it works equally well 
> for client mode as well.
> 
> One other note when needing to supplied multiple items to these args - --jars 
> and --packages should be comma separated, --driver-class-path and 
> extraClassPath should be : separated
> 
> HTH
> 
> 
> On Fri, Apr 13, 2018 at 4:28 AM, jb44  > wrote:
> Haoyuan -
> 
> As I mentioned below, I've been through the documentation already.  It has
> not helped me to resolve the issue.
> 
> Here is what I have tried so far:
> 
> - setting extraClassPath as explained below
> - adding fs.alluxio.impl through sparkconf
> - adding spark.sql.hive.metastore.sharedPrefixes (though I don't believe
> this matters in my case)
> - compiling the client from source 
> 
> Do you have any other suggestions on how to get this working?  
> 
> Thanks
> 
> 
> 
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ 
> 
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
> 
> 
> 



[Spark SQL] How to run a custom meta query for `ANALYZE TABLE`

2018-01-02 Thread Jason Heo
Hi,

I'm working on integrating Spark and a custom data source.

Most things go well with nice Spark Data Source APIs (Thanks to well
designed APIs)

But, one thing I couldn't resolve is that how to execute custom meta query
for `ANALYZE TABLE`

The custom data source I'm currently working on has a meta query so we can
get MIN/MAX/Cardinality without full scan.

What I want to do is that when `ANALYZE TABLE` is executed over the custom
data source then execute custom meta query rather than executing Full
Scanning.

If this is not possible, I'm considering inserting stats into metastore_db
manually. Is there any API exposed to handle metastore_db (e.g.
insert/delete meta db)?

Regards,

Jason


spark.pyspark.python is ignored?

2017-06-29 Thread Jason White
According to the documentation, `spark.pyspark.python` configures which
python executable is run on the workers. It seems to be ignored in my simple
test cast. I'm running on a pip-installed Pyspark 2.1.1, completely stock.
The only customization at this point is my Hadoop configuration directory.

In the below code, the `PYSPARK_PYTHON` value is used, so `session` is a
functioning SparkSession. However, it shouldn't be; `spark.pyspark.python`is
set to a nonsense value, and should take priority. If I take out the env
variable, it just loads python2 - this value doesn't appear to have any
impact for me.

Any suggestions?


import os
import pprint
import pyspark

ip = '10.30.50.73'

conf_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), 'conf',
'cloudera.yarn'))
os.environ['YARN_CONF_DIR'] = conf_dir
os.environ['HADOOP_CONF_DIR'] = conf_dir
os.environ['PYSPARK_PYTHON'] = '/u/pyenv/versions/3.6.1/bin/python3'

config = pyspark.SparkConf(loadDefaults=False)
config.set('spark.driver.host', ip)
config.set('spark.master', 'yarn')
config.set('spark.submit.deployMode', 'client')
config.set('spark.pyspark.python', 'foo/bar')

spark_builder = pyspark.sql.SparkSession.builder.config(conf=config)
session = spark_builder.getOrCreate()

context = session.sparkContext
config_string = pprint.pformat({key: value for key, value in
context.getConf().getAll()})
print(config_string)

import IPython
IPython.embed()



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-pyspark-python-is-ignored-tp28808.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Create dataset from dataframe with missing columns

2017-06-14 Thread Tokayer, Jason M.
Is it possible to concisely create a dataset from a dataframe with missing 
columns? Specifically, suppose I create a dataframe with:
val df: DataFrame  = Seq(("v1"),("v2")).toDF("f1")

Then, I have a case class for a dataset defined as:
case class CC(f1: String, f2: Option[String] = None)

I’d like to use df.as[CC] to get an instance of the case class, but this gives 
me the following error:
org.apache.spark.sql.AnalysisException: cannot resolve '`f2`' given input 
columns: [f1];

Is there a concise way to use the default values as defined by the case class?



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Create dataset from dataframe with missing columns

2017-06-14 Thread Tokayer, Jason M.
Is it possible to concisely create a dataset from a dataframe with missing 
columns? Specifically, suppose I create a dataframe with:
val df: DataFrame  = Seq(("v1"),("v2")).toDF("f1")

Then, I have a case class for a dataset defined as:
case class CC(f1: String, f2: Option[String] = None)

I’d like to use df.as[CC] to get an instance of the case class, but this gives 
me the following error:
org.apache.spark.sql.AnalysisException: cannot resolve '`f2`' given input 
columns: [f1];

Is there a concise way to use the default values as defined by the case class?


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Case class with POJO - encoder issues

2017-02-11 Thread Jason White
I'd like to create a Dataset using some classes from Geotools to do some
geospatial analysis. In particular, I'm trying to use Spark to distribute
the work based on ID and label fields that I extract from the polygon data.

My simplified case class looks like this:
implicit val geometryEncoder: Encoder[Geometry] = Encoders.kryo[Geometry]
case class IndexedGeometry(label: String, tract: Geometry)

When I try to create a dataset using this case class, it give me this error
message:
Exception in thread "main" java.lang.UnsupportedOperationException: No
Encoder found for com.vividsolutions.jts.geom.Geometry
- field (class: "com.vividsolutions.jts.geom.Geometry", name: "tract")
- root class: "org.me.HelloWorld.IndexedGeometry"

If I add another encoder for my case class...:
implicit val indexedGeometryEncoder: Encoder[IndexedGeometry] =
Encoders.kryo[IndexedGeometry]

...it works, but now the entire dataset has a single field, "value", and
it's a binary blob.

Is there a way to do what I'm trying to do?
I believe this PR is related, but it's been idle since December:
https://github.com/apache/spark/pull/15918




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Case-class-with-POJO-encoder-issues-tp28381.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[ML] Converting ml.DenseVector to mllib.Vector

2016-12-30 Thread Jason Wolosonovich

Hello All,

I'm working through the Data Science with Scala course on Big Data 
University and it is not updated to work with Spark 2.0, so I'm adapting 
the code as I work through it, however I've finally run into something 
that is over my head. I'm new to Scala as well.


When I run this code 
(https://gist.github.com/jmwoloso/a715cc4d7f1e7cc7951fab4edf6218b1) I 
get the following error:


`java.lang.ClassCastException: org.apache.spark.ml.linalg.DenseVector 
cannot be cast to org.apache.spark.mllib.linalg.Vector`


I believe this is occurring at line 107 of the gist above. The code 
starting at this line (and continuing to the end of the gist) is the 
current code in the course.


If I try to map to any other class type, then I have problems with the 
`Statistics.corr(rddVec)`.


How can I convert `rddVec` from an `ml.linalg.DenseVector` into an 
`mllib.linalg.Vector` for use with `Statistics`?


Thanks!

-Jason

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



UDAF collect_list: Hive Query or spark sql expression

2016-09-23 Thread Jason Mop
Hi Spark Team,

I see most Hive function have been implemented by Spark SQL expression, but
collect_list is still using Hive Query, will it also be implemented by
Expression in future? any update?

Cheers,
Ming


Re: Best way to go from RDD to DataFrame of StringType columns

2016-06-17 Thread Jason
We do the exact same approach you proposed for converting horrible text
formats (VCF in the bioinformatics domain) into DataFrames. This involves
creating the schema dynamically based on the header of the file too.

It's simple and easy, but if you need something higher performance you
might need to look into custom DataSet encoders though I'm not sure what
kind of gain (if any) you'd get with that approach.

Jason

On Fri, Jun 17, 2016, 12:38 PM Everett Anderson <ever...@nuna.com.invalid>
wrote:

> Hi,
>
> I have a system with files in a variety of non-standard input formats,
> though they're generally flat text files. I'd like to dynamically create
> DataFrames of string columns.
>
> What's the best way to go from a RDD to a DataFrame of StringType
> columns?
>
> My current plan is
>
>- Call map() on the RDD with a function to split the String
>into columns and call RowFactory.create() with the resulting array,
>creating a RDD
>- Construct a StructType schema using column names and StringType
>- Call SQLContext.createDataFrame(RDD, schema) to create the result
>
> Does that make sense?
>
> I looked through the spark-csv package a little and noticed that it's
> using baseRelationToDataFrame(), but BaseRelation looks like it might be a
> restricted developer API. Anyone know if it's recommended for use?
>
> Thanks!
>
> - Everett
>
>


Re: DeepSpark: where to start

2016-05-05 Thread Jason Nerothin
Just so that there is no confusion, there is a Spark user interface project
called DeepSense that is actually useful: http://deepsense.io

I am not affiliated with them in any way...

On Thu, May 5, 2016 at 9:42 AM, Joice Joy  wrote:

> What the heck, I was already beginning to like it.
>
> On Thu, May 5, 2016 at 12:31 PM, Mark Vervuurt 
> wrote:
>
>> Wel you got me fooled as wel ;)
>> Had it on my todolist to dive into this new component...
>>
>> Mark
>>
>> > Op 5 mei 2016 om 07:06 heeft Derek Chan  het
>> volgende geschreven:
>> >
>> > The blog post is a April Fool's joke. Read the last line in the post:
>> >
>> >
>> https://databricks.com/blog/2016/04/01/unreasonable-effectiveness-of-deep-learning-on-spark.html
>> >
>> >
>> >
>> >> On Thursday, May 05, 2016 10:42 AM, Joice Joy wrote:
>> >> I am trying to find info on deepspark. I read the article on
>> databricks blog which doesnt mention a git repo but does say its open
>> source.
>> >> Help me find the git repo for this. I found two and not sure which one
>> is
>> >> the databricks deepspark:
>> >> https://github.com/deepspark/deepspark
>> >> https://github.com/nearbydelta/deepspark
>> >
>> >
>> > -
>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> > For additional commands, e-mail: user-h...@spark.apache.org
>> >
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Spark Streaming / Kafka Direct Approach: Dynamic Partitionning / Multi DC Spark

2016-04-19 Thread Jason Nerothin
Let me be more detailed in my response:

Kafka works on “at least once” semantics. Therefore, given your assumption that 
Kafka "will be operational", we can assume that at least once semantics will 
hold.

At this point, it comes down to designing for consumer (really Spark Executor) 
resilience.

From a DC standpoint, you can use an in memory data fabric, like is provided by 
InsightEdge, http://insightedge.io/docs/010/index.html 
<http://insightedge.io/docs/010/index.html>. In this case, WAN replication out 
to other DCs is available at the Data Grid layer. See here: 
http://www.gigaspaces.com/Data-Replication 
<http://www.gigaspaces.com/Data-Replication>. 

Assuming that the consumers respect at least once semantics (that is: don’t 
attempt to keep track of the offset or any other state), then Spark can 
parallelize execution using Dumb Consumers. The backing data fabric can do what 
it does best, which is conflict resolution in the case that a DC goes down for 
a period of time.

One advantage of this architecture is that it can be used to load balance, 
reducing infrastructure costs.

Of course, the CAP theorem is still in play, so things like intra-DC latencies 
and consistency SLAs need to be considered. But in principle, you can balance 
competing concerns against one another based on business requirements.

HTH

> On Apr 19, 2016, at 10:53 AM, Erwan ALLAIN <eallain.po...@gmail.com> wrote:
> 
> Cody, you're right that was an example. Target architecture would be 3 DCs :) 
> Good point on ZK, I'll have to check that.
> 
> About Spark, both instances will run at the same time but on different 
> topics. That would be quite useless to have to 2DCs working on the same set 
> of data.
> I just want, in case of crash, that the healthy spark works on all topics 
> (retrieve dead spark load). 
> 
> Does it seem an awkward design ?
> 
> On Tue, Apr 19, 2016 at 5:38 PM, Cody Koeninger <c...@koeninger.org 
> <mailto:c...@koeninger.org>> wrote:
> Maybe I'm missing something, but I don't see how you get a quorum in only 2 
> datacenters (without splitbrain problem, etc).  I also don't know how well ZK 
> will work cross-datacenter.
> 
> As far as the spark side of things goes, if it's idempotent, why not just run 
> both instances all the time.
> 
> 
> 
> On Tue, Apr 19, 2016 at 10:21 AM, Erwan ALLAIN <eallain.po...@gmail.com 
> <mailto:eallain.po...@gmail.com>> wrote:
> I'm describing a disaster recovery but it can be used to make one datacenter 
> offline for upgrade for instance.
> 
> From my point of view when DC2 crashes:
> 
> On Kafka side:
> - kafka cluster will lose one or more broker (partition leader and replica)
> - partition leader lost will be reelected in the remaining healthy DC
> 
> => if the number of in-sync replicas are above the minimum threshold, kafka 
> should be operational
> 
> On downstream datastore side (say Cassandra for instance):
> - deploy accross the 2 DCs in (QUORUM / QUORUM)
> - idempotent write
> 
> => it should be ok (depends on replication factor)
> 
> On Spark:
> - treatment should be idempotent, it will allow us to restart from the last 
> commited offset
> 
> I understand that starting up a post crash job would work.
> 
> Question is: how can we detect when DC2 crashes to start a new job ?
> 
> dynamic topic partition (at each kafkaRDD creation for instance) + topic  
> subscription may be the answer ?
> 
> I appreciate your effort.
> 
> On Tue, Apr 19, 2016 at 4:25 PM, Jason Nerothin <jasonnerot...@gmail.com 
> <mailto:jasonnerot...@gmail.com>> wrote:
> It the main concern uptime or disaster recovery?
> 
>> On Apr 19, 2016, at 9:12 AM, Cody Koeninger <c...@koeninger.org 
>> <mailto:c...@koeninger.org>> wrote:
>> 
>> I think the bigger question is what happens to Kafka and your downstream 
>> data store when DC2 crashes.
>> 
>> From a Spark point of view, starting up a post-crash job in a new data 
>> center isn't really different from starting up a post-crash job in the 
>> original data center.
>> 
>> On Tue, Apr 19, 2016 at 3:32 AM, Erwan ALLAIN <eallain.po...@gmail.com 
>> <mailto:eallain.po...@gmail.com>> wrote:
>> Thanks Jason and Cody. I'll try to explain a bit better the Multi DC case.
>> 
>> As I mentionned before, I'm planning to use one kafka cluster and 2 or more 
>> spark cluster distinct. 
>> 
>> Let's say we have the following DCs configuration in a nominal case. 
>> Kafka partitions are consumed uniformly by the 2 datacenters.
>> 
>> DataCenter   Spark   Kafka Consumer GroupKafka partition (P1 to P4)
>> DC 1 Master 1.1  
>> 
>> Wor

Re: Spark Streaming / Kafka Direct Approach: Dynamic Partitionning / Multi DC Spark

2016-04-19 Thread Jason Nerothin
At that scale, it’s best not to do coordination at the application layer. 

How much of your data is transactional in nature {all, some, none}? By which I 
mean ACID-compliant.

> On Apr 19, 2016, at 10:53 AM, Erwan ALLAIN <eallain.po...@gmail.com> wrote:
> 
> Cody, you're right that was an example. Target architecture would be 3 DCs :) 
> Good point on ZK, I'll have to check that.
> 
> About Spark, both instances will run at the same time but on different 
> topics. That would be quite useless to have to 2DCs working on the same set 
> of data.
> I just want, in case of crash, that the healthy spark works on all topics 
> (retrieve dead spark load). 
> 
> Does it seem an awkward design ?
> 
> On Tue, Apr 19, 2016 at 5:38 PM, Cody Koeninger <c...@koeninger.org 
> <mailto:c...@koeninger.org>> wrote:
> Maybe I'm missing something, but I don't see how you get a quorum in only 2 
> datacenters (without splitbrain problem, etc).  I also don't know how well ZK 
> will work cross-datacenter.
> 
> As far as the spark side of things goes, if it's idempotent, why not just run 
> both instances all the time.
> 
> 
> 
> On Tue, Apr 19, 2016 at 10:21 AM, Erwan ALLAIN <eallain.po...@gmail.com 
> <mailto:eallain.po...@gmail.com>> wrote:
> I'm describing a disaster recovery but it can be used to make one datacenter 
> offline for upgrade for instance.
> 
> From my point of view when DC2 crashes:
> 
> On Kafka side:
> - kafka cluster will lose one or more broker (partition leader and replica)
> - partition leader lost will be reelected in the remaining healthy DC
> 
> => if the number of in-sync replicas are above the minimum threshold, kafka 
> should be operational
> 
> On downstream datastore side (say Cassandra for instance):
> - deploy accross the 2 DCs in (QUORUM / QUORUM)
> - idempotent write
> 
> => it should be ok (depends on replication factor)
> 
> On Spark:
> - treatment should be idempotent, it will allow us to restart from the last 
> commited offset
> 
> I understand that starting up a post crash job would work.
> 
> Question is: how can we detect when DC2 crashes to start a new job ?
> 
> dynamic topic partition (at each kafkaRDD creation for instance) + topic  
> subscription may be the answer ?
> 
> I appreciate your effort.
> 
> On Tue, Apr 19, 2016 at 4:25 PM, Jason Nerothin <jasonnerot...@gmail.com 
> <mailto:jasonnerot...@gmail.com>> wrote:
> It the main concern uptime or disaster recovery?
> 
>> On Apr 19, 2016, at 9:12 AM, Cody Koeninger <c...@koeninger.org 
>> <mailto:c...@koeninger.org>> wrote:
>> 
>> I think the bigger question is what happens to Kafka and your downstream 
>> data store when DC2 crashes.
>> 
>> From a Spark point of view, starting up a post-crash job in a new data 
>> center isn't really different from starting up a post-crash job in the 
>> original data center.
>> 
>> On Tue, Apr 19, 2016 at 3:32 AM, Erwan ALLAIN <eallain.po...@gmail.com 
>> <mailto:eallain.po...@gmail.com>> wrote:
>> Thanks Jason and Cody. I'll try to explain a bit better the Multi DC case.
>> 
>> As I mentionned before, I'm planning to use one kafka cluster and 2 or more 
>> spark cluster distinct. 
>> 
>> Let's say we have the following DCs configuration in a nominal case. 
>> Kafka partitions are consumed uniformly by the 2 datacenters.
>> 
>> DataCenter   Spark   Kafka Consumer GroupKafka partition (P1 to P4)
>> DC 1 Master 1.1  
>> 
>> Worker 1.1   my_groupP1
>> Worker 1.2   my_groupP2
>> DC 2 Master 2.1  
>> 
>> Worker 2.1   my_groupP3
>> Worker 2.2   my_groupP4
>> 
>> I would like, in case of DC crash, a rebalancing of partition on the healthy 
>> DC, something as follow
>> 
>> DataCenter   Spark   Kafka Consumer GroupKafka partition (P1 to P4)
>> DC 1 Master 1.1  
>> 
>> Worker 1.1   my_groupP1, P3
>> Worker 1.2   my_groupP2, P4
>> DC 2 Master 2.1  
>> 
>> Worker 2.1   my_groupP3
>> Worker 2.2   my_groupP4
>> 
>> I would like to know if it's possible:
>> - using consumer group ?
>> - using direct approach ? I prefer this one as I don't want to activate WAL.
>> 
>> Hope the explanation is better !
>> 
>> 
>> On Mon, Apr 18, 2016 at 6:50 PM, Cody Koeninger <c...@koeninger.org 
>> <mailto:c...@koeninger.org>> wrote:
>> The current direct stream only handles exactly the partitions
>> specified at startup.  You'd have to restart the job if

Re: Spark Streaming / Kafka Direct Approach: Dynamic Partitionning / Multi DC Spark

2016-04-19 Thread Jason Nerothin
It the main concern uptime or disaster recovery?

> On Apr 19, 2016, at 9:12 AM, Cody Koeninger <c...@koeninger.org> wrote:
> 
> I think the bigger question is what happens to Kafka and your downstream data 
> store when DC2 crashes.
> 
> From a Spark point of view, starting up a post-crash job in a new data center 
> isn't really different from starting up a post-crash job in the original data 
> center.
> 
> On Tue, Apr 19, 2016 at 3:32 AM, Erwan ALLAIN <eallain.po...@gmail.com 
> <mailto:eallain.po...@gmail.com>> wrote:
> Thanks Jason and Cody. I'll try to explain a bit better the Multi DC case.
> 
> As I mentionned before, I'm planning to use one kafka cluster and 2 or more 
> spark cluster distinct. 
> 
> Let's say we have the following DCs configuration in a nominal case. 
> Kafka partitions are consumed uniformly by the 2 datacenters.
> 
> DataCenterSpark   Kafka Consumer GroupKafka partition (P1 to P4)
> DC 1  Master 1.1  
> 
> Worker 1.1my_groupP1
> Worker 1.2my_groupP2
> DC 2  Master 2.1  
> 
> Worker 2.1my_groupP3
> Worker 2.2my_groupP4
> 
> I would like, in case of DC crash, a rebalancing of partition on the healthy 
> DC, something as follow
> 
> DataCenterSpark   Kafka Consumer GroupKafka partition (P1 to P4)
> DC 1  Master 1.1  
> 
> Worker 1.1my_groupP1, P3
> Worker 1.2my_groupP2, P4
> DC 2  Master 2.1  
> 
> Worker 2.1my_groupP3
> Worker 2.2my_groupP4
> 
> I would like to know if it's possible:
> - using consumer group ?
> - using direct approach ? I prefer this one as I don't want to activate WAL.
> 
> Hope the explanation is better !
> 
> 
> On Mon, Apr 18, 2016 at 6:50 PM, Cody Koeninger <c...@koeninger.org 
> <mailto:c...@koeninger.org>> wrote:
> The current direct stream only handles exactly the partitions
> specified at startup.  You'd have to restart the job if you changed
> partitions.
> 
> https://issues.apache.org/jira/browse/SPARK-12177 
> <https://issues.apache.org/jira/browse/SPARK-12177> has the ongoing work
> towards using the kafka 0.10 consumer, which would allow for dynamic
> topicparittions
> 
> Regarding your multi-DC questions, I'm not really clear on what you're saying.
> 
> On Mon, Apr 18, 2016 at 11:18 AM, Erwan ALLAIN <eallain.po...@gmail.com 
> <mailto:eallain.po...@gmail.com>> wrote:
> > Hello,
> >
> > I'm currently designing a solution where 2 distinct clusters Spark (2
> > datacenters) share the same Kafka (Kafka rack aware or manual broker
> > repartition).
> > The aims are
> > - preventing DC crash: using kafka resiliency and consumer group mechanism
> > (or else ?)
> > - keeping consistent offset among replica (vs mirror maker,which does not
> > keep offset)
> >
> > I have several questions
> >
> > 1) Dynamic repartition (one or 2 DC)
> >
> > I'm using KafkaDirectStream which map one partition kafka with one spark. Is
> > it possible to handle new or removed partition ?
> > In the compute method, it looks like we are always using the currentOffset
> > map to query the next batch and therefore it's always the same number of
> > partition ? Can we request metadata at each batch ?
> >
> > 2) Multi DC Spark
> >
> > Using Direct approach, a way to achieve this would be
> > - to "assign" (kafka 0.9 term) all topics to the 2 sparks
> > - only one is reading the partition (Check every x interval, "lock" stored
> > in cassandra for instance)
> >
> > => not sure if it works just an idea
> >
> > Using Consumer Group
> > - CommitOffset manually at the end of the batch
> >
> > => Does spark handle partition rebalancing ?
> >
> > I'd appreciate any ideas ! Let me know if it's not clear.
> >
> > Erwan
> >
> >
> 
> 



Re: Spark Streaming / Kafka Direct Approach: Dynamic Partitionning / Multi DC Spark

2016-04-18 Thread Jason Nerothin
Hi Erwan,

You might consider InsightEdge: http://insightedge.io <http://insightedge.io/> 
. It has the capability of doing WAN between data grids and would save you the 
work of having to re-invent the wheel. Additionally, RDDs can be shared between 
developers in the same DC.

Thanks,
Jason

> On Apr 18, 2016, at 11:18 AM, Erwan ALLAIN <eallain.po...@gmail.com> wrote:
> 
> Hello,
> 
> I'm currently designing a solution where 2 distinct clusters Spark (2 
> datacenters) share the same Kafka (Kafka rack aware or manual broker 
> repartition). 
> The aims are
> - preventing DC crash: using kafka resiliency and consumer group mechanism 
> (or else ?)
> - keeping consistent offset among replica (vs mirror maker,which does not 
> keep offset)
> 
> I have several questions 
> 
> 1) Dynamic repartition (one or 2 DC)
> 
> I'm using KafkaDirectStream which map one partition kafka with one spark. Is 
> it possible to handle new or removed partition ? 
> In the compute method, it looks like we are always using the currentOffset 
> map to query the next batch and therefore it's always the same number of 
> partition ? Can we request metadata at each batch ?
> 
> 2) Multi DC Spark
> 
> Using Direct approach, a way to achieve this would be 
> - to "assign" (kafka 0.9 term) all topics to the 2 sparks
> - only one is reading the partition (Check every x interval, "lock" stored in 
> cassandra for instance)
> 
> => not sure if it works just an idea
> 
> Using Consumer Group
> - CommitOffset manually at the end of the batch
> 
> => Does spark handle partition rebalancing ?
> 
> I'd appreciate any ideas ! Let me know if it's not clear.
> 
> Erwan
> 
> 



Re: drools on spark, how to reload rule file?

2016-04-18 Thread Jason Nerothin
Could you post some psuedo-code? 

val data = rdd.whatever(…)
val facts: Array[DroolsCompatibleType] = convert(data)

facts.map{ f => ksession.insert( f ) }


> On Apr 18, 2016, at 9:20 AM, yaoxiaohua <yaoxiao...@outlook.com> wrote:
> 
> Thanks for your reply , Jason,
> I can use stateless session in spark streaming job.
> But now my question is when the rule update, how to pass it to RDD?
> We generate a ruleExecutor(stateless session) in main method,
> Then pass the ruleExectutor in Rdd.
>  
> I am new in drools, I am trying to read the drools doc now.
> Best Regards,
> Evan
> From: Jason Nerothin [mailto:jasonnerot...@gmail.com 
> <mailto:jasonnerot...@gmail.com>] 
> Sent: 2016年4月18日 21:42
> To: yaoxiaohua
> Cc: user@spark.apache.org <mailto:user@spark.apache.org>
> Subject: Re: drools on spark, how to reload rule file?
>  
> The limitation is in the drools implementation.
>  
> Changing a rule in a stateful KB is not possible, particularly if it leads to 
> logical contradictions with the previous version or any other rule in the KB.
>  
> When we ran into this, we worked around (part of) it by salting the rule name 
> with a unique id. To get the existing rules to be evaluated when we wanted, 
> we kept a property on each fact that we mutated each time. 
>  
> Hackery, but it worked.
>  
> I recommend you try hard to use a stateless KB, if it is possible.
> 
> Thank you.
>  
> Jason
>  
> // brevity and poor typing by iPhone
> 
> On Apr 18, 2016, at 04:43, yaoxiaohua <yaoxiao...@outlook.com 
> <mailto:yaoxiao...@outlook.com>> wrote:
> 
>> Hi bros,
>> I am trying using drools on spark to parse log and do some 
>> rule match and derived some fields.
>> Now I refer one blog on cloudera, 
>> http://blog.cloudera.com/blog/2015/11/how-to-build-a-complex-event-processing-app-on-apache-spark-and-drools/
>>  
>> <http://blog.cloudera.com/blog/2015/11/how-to-build-a-complex-event-processing-app-on-apache-spark-and-drools/>
>> 
>> now I want to know whether it possible to reload the rule on 
>> the fly?
>> Thanks in advance.
>>  
>> Best Regards,
>> Evan



Re: drools on spark, how to reload rule file?

2016-04-18 Thread Jason Nerothin
The limitation is in the drools implementation.

Changing a rule in a stateful KB is not possible, particularly if it leads to 
logical contradictions with the previous version or any other rule in the KB.

When we ran into this, we worked around (part of) it by salting the rule name 
with a unique id. To get the existing rules to be evaluated when we wanted, we 
kept a property on each fact that we mutated each time. 

Hackery, but it worked.

I recommend you try hard to use a stateless KB, if it is possible.

Thank you.

Jason

// brevity and poor typing by iPhone

> On Apr 18, 2016, at 04:43, yaoxiaohua <yaoxiao...@outlook.com> wrote:
> 
> Hi bros,
> I am trying using drools on spark to parse log and do some 
> rule match and derived some fields.
> Now I refer one blog on cloudera,
> http://blog.cloudera.com/blog/2015/11/how-to-build-a-complex-event-processing-app-on-apache-spark-and-drools/
>
> now I want to know whether it possible to reload the rule on 
> the fly?
> Thanks in advance.
>  
> Best Regards,
> Evan


Re: local class incompatible: stream classdesc serialVersionUID

2016-01-29 Thread Jason Plurad
I agree with you, Ted, if RDD had a serial version UID this might not be an
issue. So that could be a JIRA to submit to help avoid version mismatches
in future Spark versions, but that doesn't help my current situation
between 1.5.1 and 1.5.2.

Any other ideas? Thanks.
On Thu, Jan 28, 2016 at 5:06 PM Ted Yu <yuzhih...@gmail.com> wrote:

> I am not Scala expert.
>
> RDD extends Serializable but doesn't have @SerialVersionUID() annotation.
> This may explain what you described.
>
> One approach is to add @SerialVersionUID so that RDD's have stable serial
> version UID.
>
> Cheers
>
> On Thu, Jan 28, 2016 at 1:38 PM, Jason Plurad <plur...@gmail.com> wrote:
>
>> I've searched through the mailing list archive. It seems that if you try
>> to run, for example, a Spark 1.5.2 program against a Spark 1.5.1 standalone
>> server, you will run into an exception like this:
>>
>> WARN  org.apache.spark.scheduler.TaskSetManager  - Lost task 0.0 in stage
>> 0.0 (TID 0, 192.168.14.103): java.io.InvalidClassException:
>> org.apache.spark.rdd.RDD; local class incompatible: stream classdesc
>> serialVersionUID = -3343649307726848892, local class serialVersionUID =
>> -3996494161745401652
>>
>> If my application is using a library that builds against Spark 1.5.2,
>> does that mean that my application is now tied to that same Spark
>> standalone server version?
>>
>> Is there a recommended way for that library to have a Spark dependency
>> but keep it compatible against a wider set of versions, i.e. any version
>> 1.5.x?
>>
>> Thanks!
>>
>
>


local class incompatible: stream classdesc serialVersionUID

2016-01-28 Thread Jason Plurad
I've searched through the mailing list archive. It seems that if you try to
run, for example, a Spark 1.5.2 program against a Spark 1.5.1 standalone
server, you will run into an exception like this:

WARN  org.apache.spark.scheduler.TaskSetManager  - Lost task 0.0 in stage
0.0 (TID 0, 192.168.14.103): java.io.InvalidClassException:
org.apache.spark.rdd.RDD; local class incompatible: stream classdesc
serialVersionUID = -3343649307726848892, local class serialVersionUID =
-3996494161745401652

If my application is using a library that builds against Spark 1.5.2, does
that mean that my application is now tied to that same Spark standalone
server version?

Is there a recommended way for that library to have a Spark dependency but
keep it compatible against a wider set of versions, i.e. any version 1.5.x?

Thanks!


Efficient join multiple times

2016-01-08 Thread Jason White
I'm trying to join a contant large-ish RDD to each RDD in a DStream, and I'm
trying to keep the join as efficient as possible so each batch finishes
within the batch window. I'm using PySpark on 1.6

I've tried the trick of keying the large RDD into (k, v) pairs and using
.partitionBy(100).persist() to pre-partition it for each join. This works,
and definitely cuts down on the time. The dstream is also mapped to matching
(k, v) pairs.

Then I'm joining the data using:
my_dstream.transform(lambda rdd: rdd.leftOuterJoin(big_rdd))

What I'm seeing happening is that, while the right side is partitioned
exactly once, thus saving me an expensive shuffle each batch, the data is
still being transferred across the network each batch. This is putting me up
to or beyond my batch window.

I thought the point of the .partitionBy() call was to persist the data at a
fixed set of nodes, and have the data from the smaller RDD shuffled to those
nodes?

I've also tried using a .rightOuterJoin instead, it appears to make no
difference. Any suggestions?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Efficient-join-multiple-times-tp25922.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Upgrading Spark in EC2 clusters

2015-11-12 Thread Jason Rubenstein
Hi,

With some minor changes to spark-ec2/spark/init.sh and writing your own
 "upgrade-spark.sh" script, you can upgrade spark in place.

(Make sure to call not only spark/init.sh but also spark/setup.sh, because
the latter uses copy-dir to get your ner version of spark to the slaves)

I wrote one so we could upgrade to a specific version of Spark (via
commit-hash) and used it to upgrade from 1.4.1. to 1.5.0

best,
Jason


On Thu, Nov 12, 2015 at 9:49 AM, Nicholas Chammas <
nicholas.cham...@gmail.com> wrote:

> spark-ec2 does not offer a way to upgrade an existing cluster, and from
> what I gather, it wasn't intended to be used to manage long-lasting
> infrastructure. The recommended approach really is to just destroy your
> existing cluster and launch a new one with the desired configuration.
>
> If you want to upgrade the cluster in place, you'll probably have to do
> that manually. Otherwise, perhaps spark-ec2 is not the right tool, and
> instead you want one of those "grown-up" management tools like Ansible
> which can be setup to allow in-place upgrades. That'll take a bit of work,
> though.
>
> Nick
>
> On Wed, Nov 11, 2015 at 6:01 PM Augustus Hong <augus...@branchmetrics.io>
> wrote:
>
>> Hey All,
>>
>> I have a Spark cluster(running version 1.5.0) on EC2 launched with the
>> provided spark-ec2 scripts. If I want to upgrade Spark to 1.5.2 in the same
>> cluster, what's the safest / recommended way to do that?
>>
>>
>> I know I can spin up a new cluster running 1.5.2, but it doesn't seem
>> efficient to spin up a new cluster every time we need to upgrade.
>>
>>
>> Thanks,
>> Augustus
>>
>>
>>
>>
>>
>> --
>> [image: Branch Metrics mobile deep linking] <http://branch.io/>* Augustus
>> Hong*
>>  Data Analytics | Branch Metrics
>>  m 650-391-3369 | e augus...@branch.io
>>
>


Re: PySpark + Streaming + DataFrames

2015-11-02 Thread Jason White
This should be resolved with
https://github.com/apache/spark/commit/f92f334ca47c03b980b06cf300aa652d0ffa1880.
The conversion no longer does a `.take` when converting from RDD -> DF.


On Mon, Oct 19, 2015 at 6:30 PM, Tathagata Das <t...@databricks.com> wrote:

> Yes, precisely! Also, for other folks who may read this, could reply back
> with the trusted conversion that worked for you (for a clear solution)?
>
> TD
>
>
> On Mon, Oct 19, 2015 at 3:08 PM, Jason White <jason.wh...@shopify.com>
> wrote:
>
>> Ah, that makes sense then, thanks TD.
>>
>> The conversion from RDD -> DF involves a `.take(10)` in PySpark, even if
>> you provide the schema, so I was avoiding back-and-forth conversions. I’ll
>> see if I can create a ‘trusted’ conversion that doesn’t involve the `take`.
>>
>> --
>> Jason
>>
>> On October 19, 2015 at 5:23:59 PM, Tathagata Das (t...@databricks.com)
>> wrote:
>>
>> RDD and DF are not compatible data types. So you cannot return a DF when
>> you have to return an RDD. What rather you can do is return the underlying
>> RDD of the dataframe by dataframe.rdd().
>>
>>
>> On Fri, Oct 16, 2015 at 12:07 PM, Jason White <jason.wh...@shopify.com>
>> wrote:
>>
>>> Hi Ken, thanks for replying.
>>>
>>> Unless I'm misunderstanding something, I don't believe that's correct.
>>> Dstream.transform() accepts a single argument, func. func should be a
>>> function that accepts a single RDD, and returns a single RDD. That's what
>>> transform_to_df does, except the RDD it returns is a DF.
>>>
>>> I've used Dstream.transform() successfully in the past when transforming
>>> RDDs, so I don't think my problem is there.
>>>
>>> I haven't tried this in Scala yet, and all of the examples I've seen on
>>> the
>>> website seem to use foreach instead of transform. Does this approach
>>> work in
>>> Scala?
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-Streaming-DataFrames-tp25095p25099.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Re: PySpark + Streaming + DataFrames

2015-10-19 Thread Jason White
Ah, that makes sense then, thanks TD.

The conversion from RDD -> DF involves a `.take(10)` in PySpark, even if you 
provide the schema, so I was avoiding back-and-forth conversions. I’ll see if I 
can create a ‘trusted’ conversion that doesn’t involve the `take`.

-- 
Jason

On October 19, 2015 at 5:23:59 PM, Tathagata Das (t...@databricks.com) wrote:

RDD and DF are not compatible data types. So you cannot return a DF when you 
have to return an RDD. What rather you can do is return the underlying RDD of 
the dataframe by dataframe.rdd(). 


On Fri, Oct 16, 2015 at 12:07 PM, Jason White <jason.wh...@shopify.com> wrote:
Hi Ken, thanks for replying.

Unless I'm misunderstanding something, I don't believe that's correct.
Dstream.transform() accepts a single argument, func. func should be a
function that accepts a single RDD, and returns a single RDD. That's what
transform_to_df does, except the RDD it returns is a DF.

I've used Dstream.transform() successfully in the past when transforming
RDDs, so I don't think my problem is there.

I haven't tried this in Scala yet, and all of the examples I've seen on the
website seem to use foreach instead of transform. Does this approach work in
Scala?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-Streaming-DataFrames-tp25095p25099.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




PySpark + Streaming + DataFrames

2015-10-16 Thread Jason White
I'm trying to create a DStream of DataFrames using PySpark. I receive data
from Kafka in the form of a JSON string, and I'm parsing these RDDs of
Strings into DataFrames.

My code is:


I get the following error at pyspark/streaming/util.py, line 64:


I've verified that the sqlContext is properly creating a DataFrame. The
issue is in the return value in the callback. Am I doing something wrong in
the DStream transform? I suspect it may be a problem in the DStream
implementation, given that it's expecting a `_jrdd` attribute.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-Streaming-DataFrames-tp25095.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: PySpark + Streaming + DataFrames

2015-10-16 Thread Jason White
Hi Ken, thanks for replying.

Unless I'm misunderstanding something, I don't believe that's correct.
Dstream.transform() accepts a single argument, func. func should be a
function that accepts a single RDD, and returns a single RDD. That's what
transform_to_df does, except the RDD it returns is a DF.

I've used Dstream.transform() successfully in the past when transforming
RDDs, so I don't think my problem is there.

I haven't tried this in Scala yet, and all of the examples I've seen on the
website seem to use foreach instead of transform. Does this approach work in
Scala?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-Streaming-DataFrames-tp25095p25099.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



PySpark Checkpoints with Broadcast Variables

2015-09-29 Thread Jason White
I'm having trouble loading a streaming job from a checkpoint when a
broadcast variable is defined. I've seen the solution by TD in Scala (
https://issues.apache.org/jira/browse/SPARK-5206) that uses a singleton to
get/create an accumulator, but I can't seem to get it to work in PySpark
with a broadcast variable.

A simplified code snippet:
broadcastHelper = {}

class StreamingJob(object):
def transform_function(self):
def transform_function_inner(t, rdd):
if 'bar' not in broadcastHelper:
broadcastHelper['bar'] =
rdd.context.broadcast(broadcastHelper['foo'])
return rdd.filter(lambda event: event['id'] not in
broadcastHelper['bar'].value)
return transform_function_inner

def createContext(self):
dstream = self.getKafkaStream()
dstream = dstream.transform(self.transform_function())
dstream.foreachRdd(lambda rdd:
rdd.foreachPartition(self.send_partition))

def run(self):
broadcastHelper['foo'] = {1, 2, 3}
ssc = StreamingContext.getOrCreate(self.checkpoint_path,
self.createContext)
ssc.start()
ssc.awaitTermination()

The error I inevitably get when restoring from the checkpoint is:
Exception: (Exception("Broadcast variable '3' not loaded!",), , (3L,))

Has anyone had any luck checkpointing in PySpark with a broadcast variable?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-Checkpoints-with-Broadcast-Variables-tp24863.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

PySpark Checkpoints with Broadcast Variables

2015-09-29 Thread Jason White
I'm having trouble loading a streaming job from a checkpoint when a
broadcast variable is defined. I've seen the solution by TD in Scala (
https://issues.apache.org/jira/browse/SPARK-5206) that uses a singleton to
get/create an accumulator, but I can't seem to get it to work in PySpark
with a broadcast variable.

A simplified code snippet:
broadcastHelper = {}

class StreamingJob(object):
def transform_function(self):
def transform_function_inner(t, rdd):
if 'bar' not in broadcastHelper:
broadcastHelper['bar'] =
rdd.context.broadcast(broadcastHelper['foo'])
return rdd.filter(lambda event: event['id'] not in
broadcastHelper['bar'].value)
return transform_function_inner

def createContext(self):
dstream = self.getKafkaStream()
dstream = dstream.transform(self.transform_function())
dstream.foreachRdd(lambda rdd:
rdd.foreachPartition(self.send_partition))

def run(self):
broadcastHelper['foo'] = {1, 2, 3}
ssc = StreamingContext.getOrCreate(self.checkpoint_path,
self.createContext)
ssc.start()
ssc.awaitTermination()

The error I inevitably get when restoring from the checkpoint is:
Exception: (Exception("Broadcast variable '3' not loaded!",), , (3L,))

Has anyone had any luck checkpointing in PySpark with a broadcast variable?


Re: Dynamic lookup table

2015-08-28 Thread Jason
Hi Nikunj,

Depending on what kind of stats you want to accumulate, you may want to
look into the Accumulator/Accumulable API, or if you need more control, you
can store these things in an external key-value store (HBase, redis, etc..)
and do careful updates there. Though be careful and make sure your updates
are atomic (transactions or CAS semantics) or you could run into race
condition problems.

Jason

On Fri, Aug 28, 2015 at 11:39 AM N B nb.nos...@gmail.com wrote:

 Hi all,

 I have the following use case that I wanted to get some insight on how to
 go about doing in Spark Streaming.

 Every batch is processed through the pipeline and at the end, it has to
 update some statistics information. This updated info should be reusable in
 the next batch of this DStream e.g for looking up the relevant stat and it
 in turn refines the stats further. It has to continue doing this for every
 batch processed. First batch in the DStream can work with empty stats
 lookup without issue. Essentially, we are trying to do a feedback loop.

 What is a good pattern to apply for something like this? Some approaches
 that I considered are:

 1. Use updateStateByKey(). But this produces a new DStream that I cannot
 refer back in the pipeline, so seems like a no-go but would be happy to be
 proven wrong.

 2. Use broadcast variables to maintain this state in a Map for example and
 continue re-brodcasting it after every batch. I am not sure if this has
 performance implications or if its even a good idea.

 3. IndexedRDD? Looked promising initially but I quickly realized that it
 might have the same issue as the updateStateByKey() approach, i.e. its not
 available in the pipeline before its created.

 4. Any other ideas that are obvious and I am missing?

 Thanks
 Nikunj




Re: How to avoid shuffle errors for a large join ?

2015-08-28 Thread Jason
Ahh yes, thanks for mentioning data skew, I've run into that before as
well. The best way there is to get statistics on the distribution of your
join key. If there are a few values with drastically larger number of
values, then a reducer task will always be swamped no matter how many
reducer side partitions you use.

If this is the problem, then one solution I have used is to do a skew join
manually. Something like:

SELECT * FROM (SELECT * from table WHERE joinkey  'commonval') t1 JOIN t2
ON t1.joinkey == t2.joinkey
UNION ALL
SELECT * FROM (SELECT * from table WHERE joinkey = 'commonval') t1 JOIN t2
ON t1.joinkey == t2.joinkey


On Fri, Aug 28, 2015 at 1:56 PM Thomas Dudziak tom...@gmail.com wrote:

 Yeah, I tried with 10k and 30k and these still failed, will try with more
 then. Though that is a little disappointing, it only writes ~7TB of shuffle
 data which shouldn't in theory require more than 1000 reducers on my 10TB
 memory cluster (~7GB of spill per reducer).
 I'm now wondering if my shuffle partitions are uneven and I should use a
 custom partitioner, is there a way to get stats on the partition sizes from
 Spark ?

 On Fri, Aug 28, 2015 at 12:46 PM, Jason ja...@jasonknight.us wrote:

 I had similar problems to this (reduce side failures for large joins
 (25bn rows with 9bn)), and found the answer was to further up the
 spark.sql.shuffle.partitions=1000. In my case, 16k partitions worked for
 me, but your tables look a little denser, so you may want to go even higher.

 On Thu, Aug 27, 2015 at 6:04 PM Thomas Dudziak tom...@gmail.com wrote:

 I'm getting errors like Removing executor with no recent heartbeats 
 Missing an output location for shuffle errors for a large SparkSql join
 (1bn rows/2.5TB joined with 1bn rows/30GB) and I'm not sure how to
 configure the job to avoid them.

 The initial stage completes fine with some 30k tasks on a cluster with
 70 machines/10TB memory, generating about 6.5TB of shuffle writes, but then
 the shuffle stage first waits 30min in the scheduling phase according to
 the UI, and then dies with the mentioned errors.

 I can see in the GC logs that the executors reach their memory limits
 (32g per executor, 2 workers per machine) and can't allocate any more stuff
 in the heap. Fwiw, the top 10 in the memory use histogram are:

 num #instances #bytes  class name
 --
1: 24913959511958700560
  scala.collection.immutable.HashMap$HashMap1
2: 251085327 8034730464  scala.Tuple2
3: 243694737 5848673688  java.lang.Float
4: 231198778 5548770672  java.lang.Integer
5:  72191585 4298521576  [Lscala.collection.immutable.HashMap;
6:  72191582 2310130624
  scala.collection.immutable.HashMap$HashTrieMap
7:  74114058 1778737392  java.lang.Long
8:   6059103  779203840  [Ljava.lang.Object;
9:   5461096  174755072  scala.collection.mutable.ArrayBuffer
   10: 34749   70122104  [B

 Relevant settings are (Spark 1.4.1, Java 8 with G1 GC):

 spark.core.connection.ack.wait.timeout 600
 spark.executor.heartbeatInterval   60s
 spark.executor.memory  32g
 spark.mesos.coarse false
 spark.network.timeout  600s
 spark.shuffle.blockTransferService netty
 spark.shuffle.consolidateFiles true
 spark.shuffle.file.buffer  1m
 spark.shuffle.io.maxRetries6
 spark.shuffle.manager  sort

 The join is currently configured with spark.sql.shuffle.partitions=1000
 but that doesn't seem to help. Would increasing the partitions help ? Is
 there a formula to determine an approximate partitions number value for a
 join ?
 Any help with this job would be appreciated !

 cheers,
 Tom





Re: Getting number of physical machines in Spark

2015-08-28 Thread Jason
I've wanted similar functionality too: when network IO bound (for me I was
trying to pull things from s3 to hdfs) I wish there was a `.mapMachines`
api where I wouldn't have to try guess at the proper partitioning of a
'driver' RDD for `sc.parallelize(1 to N, N).map( i= pull the i'th chunk
from S3 )`.

On Thu, Aug 27, 2015 at 10:01 AM Young, Matthew T matthew.t.yo...@intel.com
wrote:

 What’s the canonical way to find out the number of physical machines in a
 cluster at runtime in Spark? I believe SparkContext.defaultParallelism will
 give me the number of cores, but I’m interested in the number of NICs.



 I’m writing a Spark streaming application to ingest from Kafka with the
 Receiver API and want to create one DStream per physical machine for read
 parallelism’s sake. How can I figure out at run time how many machines
 there are so I know how many DStreams to create?



Re: Alternative to Large Broadcast Variables

2015-08-28 Thread Jason
You could try using an external key value store (like HBase, Redis) and
perform lookups/updates inside of your mappers (you'd need to create the
connection within a mapPartitions code block to avoid the connection
setup/teardown overhead)?

I haven't done this myself though, so I'm just throwing the idea out there.

On Fri, Aug 28, 2015 at 3:39 AM Hemminger Jeff j...@atware.co.jp wrote:

 Hi,

 I am working on a Spark application that is using of a large (~3G)
 broadcast variable as a lookup table. The application refines the data in
 this lookup table in an iterative manner. So this large variable is
 broadcast many times during the lifetime of the application process.

 From what I have observed perhaps 60% of the execution time is spent
 waiting for the variable to broadcast in each iteration. My reading of a
 Spark performance article[1] suggests that the time spent broadcasting will
 increase with the number of nodes I add.

 My question for the group - what would you suggest as an alternative to
 broadcasting a large variable like this?

 One approach I have considered is segmenting my RDD and adding a copy of
 the lookup table for each X number of values to process. So, for example,
 if I have a list of 1 million entries to process (eg, RDD[Entry]), I could
 split this into segments of 100K entries, with a copy of the lookup table,
 and make that an RDD[(Lookup, Array[Entry]).

 Another solution I am looking at it is making the lookup table an RDD
 instead of a broadcast variable. Perhaps I could use an IndexedRDD[2] to
 improve performance. One issue with this approach is that I would have to
 rewrite my application code to use two RDDs so that I do not reference the
 lookup RDD in the from within the closure of another RDD.

 Any other recommendations?

 Jeff


 [1]
 http://www.cs.berkeley.edu/~agearh/cs267.sp10/files/mosharaf-spark-bc-report-spring10.pdf

 [2]https://github.com/amplab/spark-indexedrdd



Re: How to avoid shuffle errors for a large join ?

2015-08-28 Thread Jason
I had similar problems to this (reduce side failures for large joins (25bn
rows with 9bn)), and found the answer was to further up the
spark.sql.shuffle.partitions=1000. In my case, 16k partitions worked for
me, but your tables look a little denser, so you may want to go even higher.

On Thu, Aug 27, 2015 at 6:04 PM Thomas Dudziak tom...@gmail.com wrote:

 I'm getting errors like Removing executor with no recent heartbeats 
 Missing an output location for shuffle errors for a large SparkSql join
 (1bn rows/2.5TB joined with 1bn rows/30GB) and I'm not sure how to
 configure the job to avoid them.

 The initial stage completes fine with some 30k tasks on a cluster with 70
 machines/10TB memory, generating about 6.5TB of shuffle writes, but then
 the shuffle stage first waits 30min in the scheduling phase according to
 the UI, and then dies with the mentioned errors.

 I can see in the GC logs that the executors reach their memory limits (32g
 per executor, 2 workers per machine) and can't allocate any more stuff in
 the heap. Fwiw, the top 10 in the memory use histogram are:

 num #instances #bytes  class name
 --
1: 24913959511958700560
  scala.collection.immutable.HashMap$HashMap1
2: 251085327 8034730464  scala.Tuple2
3: 243694737 5848673688  java.lang.Float
4: 231198778 5548770672  java.lang.Integer
5:  72191585 4298521576  [Lscala.collection.immutable.HashMap;
6:  72191582 2310130624
  scala.collection.immutable.HashMap$HashTrieMap
7:  74114058 1778737392  java.lang.Long
8:   6059103  779203840  [Ljava.lang.Object;
9:   5461096  174755072  scala.collection.mutable.ArrayBuffer
   10: 34749   70122104  [B

 Relevant settings are (Spark 1.4.1, Java 8 with G1 GC):

 spark.core.connection.ack.wait.timeout 600
 spark.executor.heartbeatInterval   60s
 spark.executor.memory  32g
 spark.mesos.coarse false
 spark.network.timeout  600s
 spark.shuffle.blockTransferService netty
 spark.shuffle.consolidateFiles true
 spark.shuffle.file.buffer  1m
 spark.shuffle.io.maxRetries6
 spark.shuffle.manager  sort

 The join is currently configured with spark.sql.shuffle.partitions=1000
 but that doesn't seem to help. Would increasing the partitions help ? Is
 there a formula to determine an approximate partitions number value for a
 join ?
 Any help with this job would be appreciated !

 cheers,
 Tom



Persisting sorted parquet tables for future sort merge joins

2015-08-26 Thread Jason
I want to persist a large _sorted_ table to Parquet on S3 and then read
this in and join it using the Sorted Merge Join strategy against another
large sorted table.

The problem is: even though I sort these tables on the join key beforehand,
once I persist them to Parquet, they lose the information about their
sortedness. Is there anyway to hint to Spark that they do not need to be
resorted the next time I read them in?

I've been trying this on 1.5 and I keep getting plans looking like:
[== Physical Plan ==]
[TungstenProject [pos#28400,workf...#28399]]
[ SortMergeJoin [CHROM#28403,pos#28400], [CHROM#28399,pos#28332]]
[ TungstenSort [CHROM#28403 ASC,pos#28400 ASC], false, 0]
[ TungstenExchange hashpartitioning(CHROM#28403,pos#28400)]
[ ConvertToUnsafe]
[ Scan ParquetRelation[file:/sorted.parquet][pos#2848424]]
[ TungstenSort [CHROM#28399 ASC,pos#28332 ASC], false, 0]
[ TungstenExchange hashpartitioning(CHROM#28399,pos#28332)]
[ ConvertToUnsafe]
[ Scan ParquetRelation[file:exploded_sorted.parquet][pos#2.399]]

As you can see, this plan exchanges and sorts the data before performing
the SortMergeJoin even though these parquet tables are already sorted.

Thanks,
Jason


Persisting sorted parquet tables for future sort merge joins

2015-08-25 Thread Jason
I want to persist a large _sorted_ table to Parquet on S3 and then read
this in and join it using the Sorted Merge Join strategy against another
large sorted table.

The problem is: even though I sort these tables on the join key beforehand,
once I persist them to Parquet, they lose the information about their
sortedness. Is there anyway to hint to Spark that they do not need to be
resorted the next time I read them in?

I've been trying this on 1.5 and I keep getting plans looking like:
[== Physical Plan ==]
[TungstenProject [pos#28400,workf...#28399]]
[ SortMergeJoin [CHROM#28403,pos#28400], [CHROM#28399,pos#28332]]
[  TungstenSort [CHROM#28403 ASC,pos#28400 ASC], false, 0]
[   TungstenExchange hashpartitioning(CHROM#28403,pos#28400)]
[ConvertToUnsafe]
[ Scan ParquetRelation[file:/sorted.parquet][pos#2848424]]
[  TungstenSort [CHROM#28399 ASC,pos#28332 ASC], false, 0]
[   TungstenExchange hashpartitioning(CHROM#28399,pos#28332)]
[ConvertToUnsafe]
[ Scan ParquetRelation[file:exploded_sorted.parquet][pos#2.399]]

Thanks,
Jason


Re: SQL with Spark Streaming

2015-03-11 Thread Jason Dai
Yes, a previous prototype is available
https://github.com/Intel-bigdata/spark-streamsql, and a talk is given at
last year's Spark Summit (
http://spark-summit.org/2014/talk/streamsql-on-spark-manipulating-streams-by-sql-using-spark
)

We are currently porting the prototype to use the latest DataFrame API, and
will provide a stable version for people to try soon.

Thabnks,
-Jason


On Wed, Mar 11, 2015 at 9:12 AM, Tobias Pfeiffer t...@preferred.jp wrote:

 Hi,

 On Wed, Mar 11, 2015 at 9:33 AM, Cheng, Hao hao.ch...@intel.com wrote:

  Intel has a prototype for doing this, SaiSai and Jason are the authors.
 Probably you can ask them for some materials.


 The github repository is here: https://github.com/intel-spark/stream-sql

 Also, what I did is writing a wrapper class SchemaDStream that internally
 holds a DStream[Row] and a DStream[StructType] (the latter having just one
 element in every RDD) and then allows to do
 - operations SchemaRDD = SchemaRDD using
 `rowStream.transformWith(schemaStream, ...)`
 - in particular you can register this stream's data as a table this way
 - and via a companion object with a method `fromSQL(sql: String):
 SchemaDStream` you can get a new stream from previously registered tables.

 However, you are limited to batch-internal operations, i.e., you can't
 aggregate across batches.

 I am not able to share the code at the moment, but will within the next
 months. It is not very advanced code, though, and should be easy to
 replicate. Also, I have no idea about the performance of transformWith

 Tobias




Re: SQL with Spark Streaming

2015-03-11 Thread Jason Dai
Sorry typo; should be https://github.com/intel-spark/stream-sql

Thanks,
-Jason

On Wed, Mar 11, 2015 at 10:19 PM, Irfan Ahmad ir...@cloudphysics.com
wrote:

 Got a 404 on that link: https://github.com/Intel-bigdata/spark-streamsql


 *Irfan Ahmad*
 CTO | Co-Founder | *CloudPhysics* http://www.cloudphysics.com
 Best of VMworld Finalist
 Best Cloud Management Award
 NetworkWorld 10 Startups to Watch
 EMA Most Notable Vendor

 On Wed, Mar 11, 2015 at 6:41 AM, Jason Dai jason@gmail.com wrote:

 Yes, a previous prototype is available
 https://github.com/Intel-bigdata/spark-streamsql, and a talk is given at
 last year's Spark Summit (
 http://spark-summit.org/2014/talk/streamsql-on-spark-manipulating-streams-by-sql-using-spark
 )

 We are currently porting the prototype to use the latest DataFrame API,
 and will provide a stable version for people to try soon.

 Thabnks,
 -Jason


 On Wed, Mar 11, 2015 at 9:12 AM, Tobias Pfeiffer t...@preferred.jp
 wrote:

 Hi,

 On Wed, Mar 11, 2015 at 9:33 AM, Cheng, Hao hao.ch...@intel.com wrote:

  Intel has a prototype for doing this, SaiSai and Jason are the
 authors. Probably you can ask them for some materials.


 The github repository is here: https://github.com/intel-spark/stream-sql

 Also, what I did is writing a wrapper class SchemaDStream that
 internally holds a DStream[Row] and a DStream[StructType] (the latter
 having just one element in every RDD) and then allows to do
 - operations SchemaRDD = SchemaRDD using
 `rowStream.transformWith(schemaStream, ...)`
 - in particular you can register this stream's data as a table this way
 - and via a companion object with a method `fromSQL(sql: String):
 SchemaDStream` you can get a new stream from previously registered tables.

 However, you are limited to batch-internal operations, i.e., you can't
 aggregate across batches.

 I am not able to share the code at the moment, but will within the next
 months. It is not very advanced code, though, and should be easy to
 replicate. Also, I have no idea about the performance of transformWith

 Tobias






Re: Speed Benchmark

2015-02-27 Thread Jason Bell

How many machines are on the cluster?
And what is the configuration of those machines (Cores/RAM)?

Small cluster is very subjective statement.


Guillaume Guy wrote:

Dear Spark users:

I want to see if anyone has an idea of the performance for a small 
cluster.




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Running Example Spark Program

2015-02-22 Thread Jason Bell
If you would like a morr detailed walkthrough I wrote one recently.

https://dataissexy.wordpress.com/2015/02/03/apache-spark-standalone-clusters-bigdata-hadoop-spark/

Regards
Jason Bell
 On 22 Feb 2015 14:16, VISHNU SUBRAMANIAN johnfedrickena...@gmail.com
wrote:

 Try restarting your Spark cluster .
 ./sbin/stop-all.sh
 ./sbin/start-all.sh

 Thanks,
 Vishnu

 On Sun, Feb 22, 2015 at 7:30 PM, Surendran Duraisamy 
 2013ht12...@wilp.bits-pilani.ac.in wrote:

  Hello All,

 I am new to Apache Spark, I am trying to run JavaKMeans.java from Spark
 Examples in my Ubuntu System.

 I downloaded spark-1.2.1-bin-hadoop2.4.tgz
 http://www.apache.org/dyn/closer.cgi/spark/spark-1.2.1/spark-1.2.1-bin-hadoop2.4.tgz
 and started sbin/start-master.sh

 After starting Spark and access http://localhost:8080/ to look at the
 status of my Spark Instance, and it shows as follows.


- *URL:* spark://vm:7077
- *Workers:* 0
- *Cores:* 0 Total, 0 Used
- *Memory:* 0.0 B Total, 0.0 B Used
- *Applications:* 0 Running, 4 Completed
- *Drivers:* 0 Running, 0 Completed
- *Status:* ALIVE

 Number of Cores is 0 and Memory is 0.0B. I think because of this I am
 getting following error when I try to run JavaKMeans.java

 Initial job has not accepted any resources; check your cluster UI to
 ensure that workers are registered and have sufficient memory

 Am I missing any configuration before running sbin/start-master.sh?
  Regards,
 Surendran





Is it possible to store graph directly into HDFS?

2014-12-30 Thread Jason Hong
Dear all:)

We're trying to make a graph using large input data and get a subgraph
applied some filter.

Now, we wanna save this graph to HDFS so that we can load later.

Is it possible to store graph or subgraph directly into HDFS and load it as
a graph for future use?

We will be glad for your suggestion.

Best regards.

Jason Hong







--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-it-possible-to-store-graph-directly-into-HDFS-tp20908.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Is it possible to store graph directly into HDFS?

2014-12-30 Thread Jason Hong
Thanks for your answer, Xuefeng Wu.

But, I don't understand how to save a graph as object. :(

Do you have any sample codes?

2014-12-31 13:27 GMT+09:00 Jason Hong begger3...@gmail.com:

 Thanks for your answer, Xuefeng Wu.

 But, I don't understand how to save a graph as object. :(

 Do you have any sample codes?

 Jason Hong

 2014-12-30 22:31 GMT+09:00 Xuefeng Wu ben...@gmail.com:

 how about save as object?


 Yours, Xuefeng Wu 吴雪峰 敬上

  On 2014年12月30日, at 下午9:27, Jason Hong begger3...@gmail.com wrote:
 
  Dear all:)
 
  We're trying to make a graph using large input data and get a subgraph
  applied some filter.
 
  Now, we wanna save this graph to HDFS so that we can load later.
 
  Is it possible to store graph or subgraph directly into HDFS and load
 it as
  a graph for future use?
 
  We will be glad for your suggestion.
 
  Best regards.
 
  Jason Hong
 
 
 
 
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Is-it-possible-to-store-graph-directly-into-HDFS-tp20908.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 





Applications status missing when Spark HA(zookeeper) enabled

2014-09-11 Thread jason chen
Hi guys,

I configured Spark with the configuration in spark-env.sh:
export SPARK_DAEMON_JAVA_OPTS=-Dspark.deploy.recoveryMode=ZOOKEEPER
-Dspark.deploy.zookeeper.url=host1:2181,host2:2181,host3:2181
-Dspark.deploy.zookeeper.dir=/spark

And I started spark-shell on one master host1(active):
MASTER=spark://host1:7077,host2:7077 bin/spark-shell

I stop-master.sh on host1, then access host2 web ui, the worker
successfully registered to new master host2,
but the running application, even the completed applications shows nothing,
did I missing anything when I configure spark HA ?

Thanks !


RE: EC2 Cluster script. Shark install fails

2014-07-11 Thread Jason H
Thanks Michael
Missed that point as well as the integration of SQL within the scala shell 
(with setting the SQLContext)Looking forward to feature parity with feature 
releases. (Shark - Spark SQL) 
Cheers.

From: mich...@databricks.com
Date: Thu, 10 Jul 2014 16:20:20 -0700
Subject: Re: EC2 Cluster script. Shark install fails
To: user@spark.apache.org

There is no version of Shark that is compatible with Spark 1.0, however, Spark 
SQL does come included automatically.  More information here:
http://databricks.com/blog/2014/07/01/shark-spark-sql-hive-on-spark-and-the-future-of-sql-on-spark.html


http://spark.apache.org/docs/latest/sql-programming-guide.html




On Thu, Jul 10, 2014 at 5:51 AM, Jason H jas...@developer.net.nz wrote:





Hi
Just going though the process of installing Spark 1.0.0 on EC2 and notice that 
the script throws an error when installing shark. 








Unpacking Spark

~/spark-ec2

Initializing shark

~ ~/spark-ec2

ERROR: Unknown Shark version


The install completes in the end but shark is completely missed. Looking for 
info on the best way to manually add this in now that the cluster is setup. Is 
there no Shark version compat with 1.0.0 or this script? 




Any suggestions appreciated.