Regarding structured streaming windows on older data

2019-12-23 Thread Hemant Bhanawat
For demonstration purpose, I was using data that had older timestamps with
structured streaming. The data was for the year 2018, window was of 24
hours and watermark of 0 seconds. Few things that I saw and could not
explain are:
1. The initial batch of streaming had around 60 windows. It processed all
but the last one.
2. The data for a window is not sent to the writer immediately.
3. If I ingest data for 2019 in the midway, it is not processed. In fact,
spark didnt output the 2019 data at all.

Can someone point me to some doc or explanation on how the structured
streaming works with data that has non current timestamps?

Thanks in advance,
Hemant


Re: mllib + SQL

2018-09-01 Thread Hemant Bhanawat
SQL in addition to simplicity also provides standard way of analysis across
multiple databases. That aspect is something that users would like with
machine learning as well.

Flexibility of Spark's API is definitely helpful but a simple and standard
way for new users is desired when it comes to machine learning.

IMO, SQL on ML should come as an incremental addition to Spark's
capabilities.


On Fri, Aug 31, 2018, 7:14 PM Sean Owen  wrote:

> My $0.02 -- this isn't worthwhile.
>
> Yes, there are ML-in-SQL tools. I'm thinking of MADlib for example. I
> think these hold over from days when someone's only interface to a data
> warehouse was SQL, and so there had to be SQL-language support for invoking
> ML jobs. There was no programmatic alternative.
>
> There's nothing particularly helpful about SQL as a language for
> expressing this, versus simply writing operations in a high-level
> programming language.
>
> Spark is that programmatic paradigm, and offers a more general way to
> express ETL, ML and SQL within their own appropriate DSLs. There's no need
> to also shoehorn Spark ML into Spark SQL.
>
> I also think there's a bit of false abstraction here. The nice thing about
> SQL-only access to these functions is it sounds much simpler, and
> accessible to people that only know SQL and nothing about Python or JVMs.
> In practice, using Spark means having some basic awareness of its
> distributed execution environment. SQL-only analysts would struggle to be
> effective with SQL-only access to Spark.
>
> On Fri, Aug 31, 2018 at 5:05 AM Hemant Bhanawat 
> wrote:
>
>> We allow our users to interact with spark cluster using SQL queries only.
>> That's easy for them. MLLib does not have SQL extensions and we cannot
>> expose it to our users.
>>
>> SQL extensions can further accelerate MLLib's adoption. See
>> https://cloud.google.com/bigquery/docs/bigqueryml-intro.
>>
>> Hemant
>>
>>
>> On Thu, Aug 30, 2018 at 9:41 PM William Benton  wrote:
>>
>>> What are you interested in accomplishing?
>>>
>>> The spark.ml package has provided a machine learning API based on
>>> DataFrames for quite some time.  If you are interested in mixing query
>>> processing and machine learning, this is certainly the best place to start.
>>>
>>> See here:  https://spark.apache.org/docs/latest/ml-guide.html
>>>
>>>
>>> best,
>>> wb
>>>
>>>
>>>
>>> On Thu, Aug 30, 2018 at 1:45 AM Hemant Bhanawat 
>>> wrote:
>>>
>>>> Is there a plan to support SQL extensions for mllib? Or is there an
>>>> effort already underway?
>>>>
>>>> Any information is appreciated.
>>>>
>>>> Thanks in advance.
>>>> Hemant
>>>>
>>>


Re: mllib + SQL

2018-08-31 Thread Hemant Bhanawat
BTW, I can contribute if there is already an effort going on somewhere.

On Fri, Aug 31, 2018 at 3:35 PM Hemant Bhanawat 
wrote:

> We allow our users to interact with spark cluster using SQL queries only.
> That's easy for them. MLLib does not have SQL extensions and we cannot
> expose it to our users.
>
> SQL extensions can further accelerate MLLib's adoption. See
> https://cloud.google.com/bigquery/docs/bigqueryml-intro.
>
> Hemant
>
>
> On Thu, Aug 30, 2018 at 9:41 PM William Benton  wrote:
>
>> What are you interested in accomplishing?
>>
>> The spark.ml package has provided a machine learning API based on
>> DataFrames for quite some time.  If you are interested in mixing query
>> processing and machine learning, this is certainly the best place to start.
>>
>> See here:  https://spark.apache.org/docs/latest/ml-guide.html
>>
>>
>> best,
>> wb
>>
>>
>>
>> On Thu, Aug 30, 2018 at 1:45 AM Hemant Bhanawat 
>> wrote:
>>
>>> Is there a plan to support SQL extensions for mllib? Or is there an
>>> effort already underway?
>>>
>>> Any information is appreciated.
>>>
>>> Thanks in advance.
>>> Hemant
>>>
>>


Re: mllib + SQL

2018-08-31 Thread Hemant Bhanawat
We allow our users to interact with spark cluster using SQL queries only.
That's easy for them. MLLib does not have SQL extensions and we cannot
expose it to our users.

SQL extensions can further accelerate MLLib's adoption. See
https://cloud.google.com/bigquery/docs/bigqueryml-intro.

Hemant


On Thu, Aug 30, 2018 at 9:41 PM William Benton  wrote:

> What are you interested in accomplishing?
>
> The spark.ml package has provided a machine learning API based on
> DataFrames for quite some time.  If you are interested in mixing query
> processing and machine learning, this is certainly the best place to start.
>
> See here:  https://spark.apache.org/docs/latest/ml-guide.html
>
>
> best,
> wb
>
>
>
> On Thu, Aug 30, 2018 at 1:45 AM Hemant Bhanawat 
> wrote:
>
>> Is there a plan to support SQL extensions for mllib? Or is there an
>> effort already underway?
>>
>> Any information is appreciated.
>>
>> Thanks in advance.
>> Hemant
>>
>


mllib + SQL

2018-08-30 Thread Hemant Bhanawat
Is there a plan to support SQL extensions for mllib? Or is there an effort
already underway?

Any information is appreciated.

Thanks in advance.
Hemant


Re: Sorting on a streaming dataframe

2018-05-01 Thread Hemant Bhanawat
Opened an issue. https://issues.apache.org/jira/browse/SPARK-24144

Since it is a Major issue for us, I have marked it as Major issue. Feel
free to change if that is not the case from Spark's perspective.

On Tue, May 1, 2018 at 4:34 AM, Michael Armbrust <mich...@databricks.com>
wrote:

> Please open a JIRA then!
>
> On Fri, Apr 27, 2018 at 3:59 AM Hemant Bhanawat <hemant9...@gmail.com>
> wrote:
>
>> I see.
>>
>> monotonically_increasing_id on streaming dataFrames will be really
>> helpful to me and I believe to many more users. Adding this functionality
>> in Spark would be efficient in terms of performance as compared to
>> implementing this functionality inside the applications.
>>
>> Hemant
>>
>> On Thu, Apr 26, 2018 at 11:59 PM, Michael Armbrust <
>> mich...@databricks.com> wrote:
>>
>>> The basic tenet of structured streaming is that a query should return
>>> the same answer in streaming or batch mode. We support sorting in complete
>>> mode because we have all the data and can sort it correctly and return the
>>> full answer.  In update or append mode, sorting would only return a correct
>>> answer if we could promise that records that sort lower are going to arrive
>>> later (and we can't).  Therefore, it is disallowed.
>>>
>>> If you are just looking for a unique, stable id and you are already
>>> using kafka as the source, you could just combine the partition id and the
>>> offset. The structured streaming connector to Kafka
>>> <https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html>
>>> exposes both of these in the schema of the streaming DataFrame. (similarly
>>> for kinesis you can use the shard id and sequence number)
>>>
>>> If you need the IDs to be contiguous, then this is a somewhat
>>> fundamentally hard problem.  I think the best we could do is add support
>>> for monotonically_increasing_id() in streaming dataframes.
>>>
>>> On Tue, Apr 24, 2018 at 1:38 PM, Chayapan Khannabha <chaya...@gmail.com>
>>> wrote:
>>>
>>>> Perhaps your use case fits to Apache Kafka better.
>>>>
>>>> More info at:
>>>> https://kafka.apache.org/documentation/streams/
>>>>
>>>> Everything really comes down to the architecture design and algorithm
>>>> spec. However, from my experience with Spark, there are many good reasons
>>>> why this requirement is not supported ;)
>>>>
>>>> Best,
>>>>
>>>> Chayapan (A)
>>>>
>>>>
>>>> On Apr 24, 2018, at 2:18 PM, Hemant Bhanawat <hemant9...@gmail.com>
>>>> wrote:
>>>>
>>>> Thanks Chris. There are many ways in which I can solve this problem but
>>>> they are cumbersome. The easiest way would have been to sort the streaming
>>>> dataframe. The reason I asked this question is because I could not find a
>>>> reason why sorting on streaming dataframe is disallowed.
>>>>
>>>> Hemant
>>>>
>>>> On Mon, Apr 16, 2018 at 6:09 PM, Bowden, Chris <
>>>> chris.bow...@microfocus.com> wrote:
>>>>
>>>>> You can happily sort the underlying RDD of InternalRow(s) inside a
>>>>> sink, assuming you are willing to implement and maintain your own sink(s).
>>>>> That is, just grabbing the parquet sink, etc. isn’t going to work out of
>>>>> the box. Alternatively map/flatMapGroupsWithState is probably sufficient
>>>>> and requires less working knowledge to make effective reuse of internals.
>>>>> Just group by foo and then sort accordingly and assign ids. The id counter
>>>>> can be stateful per group. Sometimes this problem may not need to be 
>>>>> solved
>>>>> at all. For example, if you are using kafka, a proper partitioning scheme
>>>>> and message offsets may be “good enough”.
>>>>> --
>>>>> *From:* Hemant Bhanawat <hemant9...@gmail.com>
>>>>> *Sent:* Thursday, April 12, 2018 11:42:59 PM
>>>>> *To:* Reynold Xin
>>>>> *Cc:* dev
>>>>> *Subject:* Re: Sorting on a streaming dataframe
>>>>>
>>>>> Well, we want to assign snapshot ids (incrementing counters) to the
>>>>> incoming records. For that, we are zipping the streaming rdds with that
>>>>> counter using a modified version of ZippedWithIndexRDD. We are ok if th

[jira] [Updated] (SPARK-24144) monotonically_increasing_id on streaming dataFrames

2018-05-01 Thread Hemant Bhanawat (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-24144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hemant Bhanawat updated SPARK-24144:

Priority: Major  (was: Minor)

> monotonically_increasing_id on streaming dataFrames
> ---
>
> Key: SPARK-24144
> URL: https://issues.apache.org/jira/browse/SPARK-24144
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Affects Versions: 2.3.0
>    Reporter: Hemant Bhanawat
>Priority: Major
>
> For our use case, we want to assign snapshot ids (incrementing counters) to 
> the incoming records. In case of failures, the same record should get the 
> same id after failure so that the downstream DB can handle the records in a 
> correct manner. 
> We were trying to do this by zipping the streaming rdds with that counter 
> using a modified version of ZippedWithIndexRDD. There are other ways to do 
> that but it turns out all ways are cumbersome and error prone in failure 
> scenarios.
> As suggested on the spark user dev list, one way to do this would be to 
> support monotonically_increasing_id on streaming dataFrames in Spark code 
> base. This would ensure that counters are incrementing for the records of the 
> stream. Also, since the counter can be checkpointed, it would work well in 
> case of failure scenarios. Last but not the least, doing this in spark would 
> be the most performance efficient way.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24144) monotonically_increasing_id on streaming dataFrames

2018-05-01 Thread Hemant Bhanawat (JIRA)
Hemant Bhanawat created SPARK-24144:
---

 Summary: monotonically_increasing_id on streaming dataFrames
 Key: SPARK-24144
 URL: https://issues.apache.org/jira/browse/SPARK-24144
 Project: Spark
  Issue Type: New Feature
  Components: Structured Streaming
Affects Versions: 2.3.0
Reporter: Hemant Bhanawat


For our use case, we want to assign snapshot ids (incrementing counters) to the 
incoming records. In case of failures, the same record should get the same id 
after failure so that the downstream DB can handle the records in a correct 
manner. 

We were trying to do this by zipping the streaming rdds with that counter using 
a modified version of ZippedWithIndexRDD. There are other ways to do that but 
it turns out all ways are cumbersome and error prone in failure scenarios.

As suggested on the spark user dev list, one way to do this would be to support 
monotonically_increasing_id on streaming dataFrames in Spark code base. This 
would ensure that counters are incrementing for the records of the stream. 
Also, since the counter can be checkpointed, it would work well in case of 
failure scenarios. Last but not the least, doing this in spark would be the 
most performance efficient way.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: Sorting on a streaming dataframe

2018-04-27 Thread Hemant Bhanawat
I see.

monotonically_increasing_id on streaming dataFrames will be really helpful
to me and I believe to many more users. Adding this functionality in Spark
would be efficient in terms of performance as compared to implementing this
functionality inside the applications.

Hemant

On Thu, Apr 26, 2018 at 11:59 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> The basic tenet of structured streaming is that a query should return the
> same answer in streaming or batch mode. We support sorting in complete mode
> because we have all the data and can sort it correctly and return the full
> answer.  In update or append mode, sorting would only return a correct
> answer if we could promise that records that sort lower are going to arrive
> later (and we can't).  Therefore, it is disallowed.
>
> If you are just looking for a unique, stable id and you are already using
> kafka as the source, you could just combine the partition id and the
> offset. The structured streaming connector to Kafka
> <https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html>
> exposes both of these in the schema of the streaming DataFrame. (similarly
> for kinesis you can use the shard id and sequence number)
>
> If you need the IDs to be contiguous, then this is a somewhat
> fundamentally hard problem.  I think the best we could do is add support
> for monotonically_increasing_id() in streaming dataframes.
>
> On Tue, Apr 24, 2018 at 1:38 PM, Chayapan Khannabha <chaya...@gmail.com>
> wrote:
>
>> Perhaps your use case fits to Apache Kafka better.
>>
>> More info at:
>> https://kafka.apache.org/documentation/streams/
>>
>> Everything really comes down to the architecture design and algorithm
>> spec. However, from my experience with Spark, there are many good reasons
>> why this requirement is not supported ;)
>>
>> Best,
>>
>> Chayapan (A)
>>
>>
>> On Apr 24, 2018, at 2:18 PM, Hemant Bhanawat <hemant9...@gmail.com>
>> wrote:
>>
>> Thanks Chris. There are many ways in which I can solve this problem but
>> they are cumbersome. The easiest way would have been to sort the streaming
>> dataframe. The reason I asked this question is because I could not find a
>> reason why sorting on streaming dataframe is disallowed.
>>
>> Hemant
>>
>> On Mon, Apr 16, 2018 at 6:09 PM, Bowden, Chris <
>> chris.bow...@microfocus.com> wrote:
>>
>>> You can happily sort the underlying RDD of InternalRow(s) inside a sink,
>>> assuming you are willing to implement and maintain your own sink(s). That
>>> is, just grabbing the parquet sink, etc. isn’t going to work out of the
>>> box. Alternatively map/flatMapGroupsWithState is probably sufficient and
>>> requires less working knowledge to make effective reuse of internals. Just
>>> group by foo and then sort accordingly and assign ids. The id counter can
>>> be stateful per group. Sometimes this problem may not need to be solved at
>>> all. For example, if you are using kafka, a proper partitioning scheme and
>>> message offsets may be “good enough”.
>>> --
>>> *From:* Hemant Bhanawat <hemant9...@gmail.com>
>>> *Sent:* Thursday, April 12, 2018 11:42:59 PM
>>> *To:* Reynold Xin
>>> *Cc:* dev
>>> *Subject:* Re: Sorting on a streaming dataframe
>>>
>>> Well, we want to assign snapshot ids (incrementing counters) to the
>>> incoming records. For that, we are zipping the streaming rdds with that
>>> counter using a modified version of ZippedWithIndexRDD. We are ok if the
>>> records in the streaming dataframe gets counters in random order but the
>>> counter should always be incrementing.
>>>
>>> This is working fine until we have a failure. When we have a failure, we
>>> re-assign the records to snapshot ids  and this time same snapshot id can
>>> get assigned to a different record. This is a problem because the primary
>>> key in our storage engine is <recordid, snapshotid>. So we want to sort the
>>> dataframe so that the records always get the same snapshot id.
>>>
>>>
>>>
>>> On Fri, Apr 13, 2018 at 11:43 AM, Reynold Xin <r...@databricks.com>
>>> wrote:
>>>
>>> Can you describe your use case more?
>>>
>>> On Thu, Apr 12, 2018 at 11:12 PM Hemant Bhanawat <hemant9...@gmail.com>
>>> wrote:
>>>
>>> Hi Guys,
>>>
>>> Why is sorting on streaming dataframes not supported(unless it is
>>> complete mode)? My downstream needs me to sort the streaming dataframe.
>>>
>>> Hemant
>>>
>>>
>>>
>>
>>
>


Re: Sorting on a streaming dataframe

2018-04-24 Thread Hemant Bhanawat
Thanks Chris. There are many ways in which I can solve this problem but
they are cumbersome. The easiest way would have been to sort the streaming
dataframe. The reason I asked this question is because I could not find a
reason why sorting on streaming dataframe is disallowed.

Hemant

On Mon, Apr 16, 2018 at 6:09 PM, Bowden, Chris <chris.bow...@microfocus.com>
wrote:

> You can happily sort the underlying RDD of InternalRow(s) inside a sink,
> assuming you are willing to implement and maintain your own sink(s). That
> is, just grabbing the parquet sink, etc. isn’t going to work out of the
> box. Alternatively map/flatMapGroupsWithState is probably sufficient and
> requires less working knowledge to make effective reuse of internals. Just
> group by foo and then sort accordingly and assign ids. The id counter can
> be stateful per group. Sometimes this problem may not need to be solved at
> all. For example, if you are using kafka, a proper partitioning scheme and
> message offsets may be “good enough”.
> ------
> *From:* Hemant Bhanawat <hemant9...@gmail.com>
> *Sent:* Thursday, April 12, 2018 11:42:59 PM
> *To:* Reynold Xin
> *Cc:* dev
> *Subject:* Re: Sorting on a streaming dataframe
>
> Well, we want to assign snapshot ids (incrementing counters) to the
> incoming records. For that, we are zipping the streaming rdds with that
> counter using a modified version of ZippedWithIndexRDD. We are ok if the
> records in the streaming dataframe gets counters in random order but the
> counter should always be incrementing.
>
> This is working fine until we have a failure. When we have a failure, we
> re-assign the records to snapshot ids  and this time same snapshot id can
> get assigned to a different record. This is a problem because the primary
> key in our storage engine is <recordid, snapshotid>. So we want to sort the
> dataframe so that the records always get the same snapshot id.
>
>
>
> On Fri, Apr 13, 2018 at 11:43 AM, Reynold Xin <r...@databricks.com> wrote:
>
> Can you describe your use case more?
>
> On Thu, Apr 12, 2018 at 11:12 PM Hemant Bhanawat <hemant9...@gmail.com>
> wrote:
>
> Hi Guys,
>
> Why is sorting on streaming dataframes not supported(unless it is complete
> mode)? My downstream needs me to sort the streaming dataframe.
>
> Hemant
>
>
>


Re: Sorting on a streaming dataframe

2018-04-13 Thread Hemant Bhanawat
Well, we want to assign snapshot ids (incrementing counters) to the
incoming records. For that, we are zipping the streaming rdds with that
counter using a modified version of ZippedWithIndexRDD. We are ok if the
records in the streaming dataframe gets counters in random order but the
counter should always be incrementing.

This is working fine until we have a failure. When we have a failure, we
re-assign the records to snapshot ids  and this time same snapshot id can
get assigned to a different record. This is a problem because the primary
key in our storage engine is <recordid, snapshotid>. So we want to sort the
dataframe so that the records always get the same snapshot id.



On Fri, Apr 13, 2018 at 11:43 AM, Reynold Xin <r...@databricks.com> wrote:

> Can you describe your use case more?
>
> On Thu, Apr 12, 2018 at 11:12 PM Hemant Bhanawat <hemant9...@gmail.com>
> wrote:
>
>> Hi Guys,
>>
>> Why is sorting on streaming dataframes not supported(unless it is
>> complete mode)? My downstream needs me to sort the streaming dataframe.
>>
>> Hemant
>>
>


Sorting on a streaming dataframe

2018-04-13 Thread Hemant Bhanawat
Hi Guys,

Why is sorting on streaming dataframes not supported(unless it is complete
mode)? My downstream needs me to sort the streaming dataframe.

Hemant


Re: [VOTE] [SPIP] SPARK-15689: Data Source API V2 read path

2017-09-10 Thread Hemant Bhanawat
+1 (non-binding)

I have found the suggestion from Andrew Ash and James about plan push down
quite interesting. However, I am not clear about the join push-down support
at the data source level. Shouldn't it be the responsibility of the join
node to carry out a data source specific join? I mean join node and the
data source scan of the two sides can be coalesced into a single node
(theoretically). This can be done by providing a Strategy that replaces the
join node with a data source specific join node. We are doing it that way
for our data sources. I find this more intuitive.

BTW, aggregate push-down support is desirable and should be considered as
an enhancement going forward.

Hemant Bhanawat <https://www.linkedin.com/in/hemant-bhanawat-92a3811>
www.snappydata.io

On Sun, Sep 10, 2017 at 8:45 PM, vaquar khan <vaquar.k...@gmail.com> wrote:

> +1
>
> Regards,
> Vaquar khan
>
> On Sep 10, 2017 5:18 AM, "Noman Khan" <nomanbp...@live.com> wrote:
>
>> +1
>> --
>> *From:* wangzhenhua (G) <wangzhen...@huawei.com>
>> *Sent:* Friday, September 8, 2017 2:20:07 AM
>> *To:* Dongjoon Hyun; 蒋星博
>> *Cc:* Michael Armbrust; Reynold Xin; Andrew Ash; Herman van Hövell tot
>> Westerflier; Ryan Blue; Spark dev list; Suresh Thalamati; Wenchen Fan
>> *Subject:* 答复: [VOTE] [SPIP] SPARK-15689: Data Source API V2 read path
>>
>>
>> +1 (non-binding)  Great to see data source API is going to be improved!
>>
>>
>>
>> best regards,
>>
>> -Zhenhua(Xander)
>>
>>
>>
>> *发件人:* Dongjoon Hyun [mailto:dongjoon.h...@gmail.com]
>> *发送时间:* 2017年9月8日 4:07
>> *收件人:* 蒋星博
>> *抄送:* Michael Armbrust; Reynold Xin; Andrew Ash; Herman van Hövell tot
>> Westerflier; Ryan Blue; Spark dev list; Suresh Thalamati; Wenchen Fan
>> *主题:* Re: [VOTE] [SPIP] SPARK-15689: Data Source API V2 read path
>>
>>
>>
>> +1 (non-binding).
>>
>>
>>
>> On Thu, Sep 7, 2017 at 12:46 PM, 蒋星博 <jiangxb1...@gmail.com> wrote:
>>
>> +1
>>
>>
>>
>>
>>
>> Reynold Xin <r...@databricks.com>于2017年9月7日 周四下午12:04写道:
>>
>> +1 as well
>>
>>
>>
>> On Thu, Sep 7, 2017 at 9:12 PM, Michael Armbrust <mich...@databricks.com>
>> wrote:
>>
>> +1
>>
>>
>>
>> On Thu, Sep 7, 2017 at 9:32 AM, Ryan Blue <rb...@netflix.com.invalid>
>> wrote:
>>
>> +1 (non-binding)
>>
>> Thanks for making the updates reflected in the current PR. It would be
>> great to see the doc updated before it is finally published though.
>>
>> Right now it feels like this SPIP is focused more on getting the basics
>> right for what many datasources are already doing in API V1 combined with
>> other private APIs, vs pushing forward state of the art for performance.
>>
>> I think that’s the right approach for this SPIP. We can add the support
>> you’re talking about later with a more specific plan that doesn’t block
>> fixing the problems that this addresses.
>>
>> ​
>>
>>
>>
>> On Thu, Sep 7, 2017 at 2:00 AM, Herman van Hövell tot Westerflier <
>> hvanhov...@databricks.com> wrote:
>>
>> +1 (binding)
>>
>>
>>
>> I personally believe that there is quite a big difference between having
>> a generic data source interface with a low surface area and pushing down a
>> significant part of query processing into a datasource. The later has much
>> wider wider surface area and will require us to stabilize most of the
>> internal catalyst API's which will be a significant burden on the community
>> to maintain and has the potential to slow development velocity
>> significantly. If you want to write such integrations then you should be
>> prepared to work with catalyst internals and own up to the fact that things
>> might change across minor versions (and in some cases even maintenance
>> releases). If you are willing to go down that road, then your best bet is
>> to use the already existing spark session extensions which will allow you
>> to write such integrations and can be used as an `escape hatch`.
>>
>>
>>
>>
>>
>> On Thu, Sep 7, 2017 at 10:23 AM, Andrew Ash <and...@andrewash.com> wrote:
>>
>> +0 (non-binding)
>>
>>
>>
>> I think there are benefits to unifying all the Spark-internal datasources
>> into a common public API for sure.  It will serve as a forcing function to
>> ensure that those internal datasources aren't advantaged vs datasources
>> developed externally as plugins to Spa

Re: How to specify file

2016-09-23 Thread Hemant Bhanawat
Check out the READEME on the following page. This is the csv connector that
you are using. I think you need to specify the delimiter option.

https://github.com/databricks/spark-csv

Hemant Bhanawat <https://www.linkedin.com/in/hemant-bhanawat-92a3811>
www.snappydata.io

On Fri, Sep 23, 2016 at 12:26 PM, Sea <261810...@qq.com> wrote:

> Hi, I want to run sql directly on files, I find that spark has supported
> sql like select * from csv.`/path/to/file`, but files may not be split by
> ','. Maybe it is split by '\001', how can I specify delimiter?
>
> Thank you!
>
>
>


Re: CSV Reader with row numbers

2016-09-22 Thread Hemant Bhanawat
zipWithIndex is fine. It will give you unique row IDs across your various
partitions.

You can also use zipWithUniqueId which saves an extra job that is fired by
zipWithIndex. However, there are some differences as to how indexes are
assigned to the row. You can read more about the two APIs in the API
documentation.

https://spark.apache.org/docs/1.6.1/api/scala/index.html#org.apache.spark.rdd.RDD

Hemant Bhanawat <https://www.linkedin.com/in/hemant-bhanawat-92a3811>
www.snappydata.io

On Thu, Sep 15, 2016 at 4:28 AM, Akshay Sachdeva <akshay.sachd...@gmail.com>
wrote:

> Environment:
> Apache Spark 1.6.2
> Scala: 2.10
>
> I am currently using the spark-csv package courtesy of databricks and I
> would like to have a (pre processing ?) stage when reading the CSV file
> that
> also adds a row number to each row of data being read from the csv file.
> This will allow for better traceability and data lineage in case of
> validation or data processing issues downstream.
>
> In doing the research it seems like the zipWithIndex API is the right or
> only way to get this pattern implemented.
>
> Would this be the preferred route?  Would this be safe for parallel
> operations as far as respect no collisions?  Any body have a similar
> requirement and have a better solution you can point me to.
>
> Appreciate any help and responses anyone can offer.
>
> Thanks
> -a
>
>
>
> --
> View this message in context: http://apache-spark-
> developers-list.1001551.n3.nabble.com/CSV-Reader-with-
> row-numbers-tp18946.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


Memory usage by Spark jobs

2016-09-22 Thread Hemant Bhanawat
I am working on profiling TPCH queries for Spark 2.0.  I see lot of
temporary object creation (sometimes size as much as the data size) which
is justified for the kind of processing Spark does. But, from production
perspective, is there a guideline on how much memory should be allocated
for processing a specific data size of let's say parquet data? Also, has
someone investigated memory usage for the individual SQL operators like
Filter, group by, order by, Exchange etc.?

Hemant Bhanawat <https://www.linkedin.com/in/hemant-bhanawat-92a3811>
www.snappydata.io


Memory usage by Spark jobs

2016-09-22 Thread Hemant Bhanawat
I am working on profiling TPCH queries for Spark 2.0.  I see lot of
temporary object creation (sometimes size as much as the data size) which
is justified for the kind of processing Spark does. But, from production
perspective, is there a guideline on how much memory should be allocated
for processing a specific data size of let's say parquet data? Also, has
someone investigated memory usage for the individual SQL operators like
Filter, group by, order by, Exchange etc.?

Hemant Bhanawat <https://www.linkedin.com/in/hemant-bhanawat-92a3811>
www.snappydata.io


[jira] [Reopened] (SPARK-13693) Flaky test: o.a.s.streaming.MapWithStateSuite

2016-04-24 Thread Hemant Bhanawat (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-13693?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hemant Bhanawat reopened SPARK-13693:
-

> Flaky test: o.a.s.streaming.MapWithStateSuite
> -
>
> Key: SPARK-13693
> URL: https://issues.apache.org/jira/browse/SPARK-13693
> Project: Spark
>  Issue Type: Test
>  Components: Tests
>Reporter: Shixiong Zhu
>Assignee: Shixiong Zhu
>Priority: Minor
> Fix For: 2.0.0
>
>
> Fixed the following flaky test:
> https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.7/256/testReport/junit/org.apache.spark.streaming/MapWithStateSuite/_It_is_not_a_test_/
> {code}
> sbt.ForkMain$ForkError: java.io.IOException: Failed to delete: 
> /home/jenkins/workspace/spark-master-test-sbt-hadoop-2.7/streaming/checkpoint/spark-e97794a8-b940-4b21-8685-bf1221f9444d
>   at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:934)
>   at 
> org.apache.spark.streaming.MapWithStateSuite$$anonfun$2.apply$mcV$sp(MapWithStateSuite.scala:47)
>   at 
> org.apache.spark.streaming.MapWithStateSuite$$anonfun$2.apply(MapWithStateSuite.scala:45)
>   at 
> org.apache.spark.streaming.MapWithStateSuite$$anonfun$2.apply(MapWithStateSuite.scala:45)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-13693) Flaky test: o.a.s.streaming.MapWithStateSuite

2016-04-24 Thread Hemant Bhanawat (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15255881#comment-15255881
 ] 

Hemant Bhanawat commented on SPARK-13693:
-

Latest Jenkins builds are failing with this issue. See:
https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2863
https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/2865

> Flaky test: o.a.s.streaming.MapWithStateSuite
> -
>
> Key: SPARK-13693
> URL: https://issues.apache.org/jira/browse/SPARK-13693
> Project: Spark
>  Issue Type: Test
>  Components: Tests
>Reporter: Shixiong Zhu
>Assignee: Shixiong Zhu
>Priority: Minor
> Fix For: 2.0.0
>
>
> Fixed the following flaky test:
> https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.7/256/testReport/junit/org.apache.spark.streaming/MapWithStateSuite/_It_is_not_a_test_/
> {code}
> sbt.ForkMain$ForkError: java.io.IOException: Failed to delete: 
> /home/jenkins/workspace/spark-master-test-sbt-hadoop-2.7/streaming/checkpoint/spark-e97794a8-b940-4b21-8685-bf1221f9444d
>   at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:934)
>   at 
> org.apache.spark.streaming.MapWithStateSuite$$anonfun$2.apply$mcV$sp(MapWithStateSuite.scala:47)
>   at 
> org.apache.spark.streaming.MapWithStateSuite$$anonfun$2.apply(MapWithStateSuite.scala:45)
>   at 
> org.apache.spark.streaming.MapWithStateSuite$$anonfun$2.apply(MapWithStateSuite.scala:45)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-14729) Implement an existing cluster manager with New ExternalClusterManager interface

2016-04-19 Thread Hemant Bhanawat (JIRA)
Hemant Bhanawat created SPARK-14729:
---

 Summary: Implement an existing cluster manager with New 
ExternalClusterManager interface
 Key: SPARK-14729
 URL: https://issues.apache.org/jira/browse/SPARK-14729
 Project: Spark
  Issue Type: Improvement
  Components: Scheduler
Reporter: Hemant Bhanawat
Priority: Minor


SPARK-13904 adds an ExternalClusterManager interface to Spark to allow external 
cluster managers to spawn Spark components. 

This JIRA tracks following suggestion from [~rxin]: 

'One thing - can you guys try to see if you can implement one of the existing 
cluster managers with this, and then we can make sure this is a proper API? 
Otherwise it is really easy to get removed because it is currently unused by 
anything in Spark.' 





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-14729) Implement an existing cluster manager with New ExternalClusterManager interface

2016-04-19 Thread Hemant Bhanawat (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247543#comment-15247543
 ] 

Hemant Bhanawat commented on SPARK-14729:
-

I am looking into this. 

> Implement an existing cluster manager with New ExternalClusterManager 
> interface
> ---
>
> Key: SPARK-14729
> URL: https://issues.apache.org/jira/browse/SPARK-14729
> Project: Spark
>  Issue Type: Improvement
>  Components: Scheduler
>    Reporter: Hemant Bhanawat
>Priority: Minor
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> SPARK-13904 adds an ExternalClusterManager interface to Spark to allow 
> external cluster managers to spawn Spark components. 
> This JIRA tracks following suggestion from [~rxin]: 
> 'One thing - can you guys try to see if you can implement one of the existing 
> cluster managers with this, and then we can make sure this is a proper API? 
> Otherwise it is really easy to get removed because it is currently unused by 
> anything in Spark.' 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-13904) Add support for pluggable cluster manager

2016-04-18 Thread Hemant Bhanawat (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13904?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247096#comment-15247096
 ] 

Hemant Bhanawat commented on SPARK-13904:
-

[~kiszk] Since the builds are passing now, can I assume that it was some 
sporadic issue and close this JIRA?

> Add support for pluggable cluster manager
> -
>
> Key: SPARK-13904
> URL: https://issues.apache.org/jira/browse/SPARK-13904
> Project: Spark
>  Issue Type: Improvement
>  Components: Scheduler
>    Reporter: Hemant Bhanawat
>
> Currently Spark allows only a few cluster managers viz Yarn, Mesos and 
> Standalone. But, as Spark is now being used in newer and different use cases, 
> there is a need for allowing other cluster managers to manage spark 
> components. One such use case is - embedding spark components like executor 
> and driver inside another process which may be a datastore. This allows 
> colocation of data and processing. Another requirement that stems from such a 
> use case is that the executors/driver should not take the parent process down 
> when they go down and the components can be relaunched inside the same 
> process again. 
> So, this JIRA requests two functionalities:
> 1. Support for external cluster managers
> 2. Allow a cluster manager to clean up the tasks without taking the parent 
> process down. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-13904) Add support for pluggable cluster manager

2016-04-18 Thread Hemant Bhanawat (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13904?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15245581#comment-15245581
 ] 

Hemant Bhanawat commented on SPARK-13904:
-

I ran the following command on my machine 
build/sbt test:compile  -Pyarn -Phadoop-2.6 -Phive -Pkinesis-asl 
-Phive-thriftserver test 
but org.apache.spark.sql.hive.HiveSparkSubmitSuite passed. 

I also reviewed my code of ExternalClusterManagerSuite to ensure that I am 
stopping SparkContext properly. Any ideas what I should be looking at? 

> Add support for pluggable cluster manager
> -
>
> Key: SPARK-13904
> URL: https://issues.apache.org/jira/browse/SPARK-13904
> Project: Spark
>  Issue Type: Improvement
>  Components: Scheduler
>    Reporter: Hemant Bhanawat
>
> Currently Spark allows only a few cluster managers viz Yarn, Mesos and 
> Standalone. But, as Spark is now being used in newer and different use cases, 
> there is a need for allowing other cluster managers to manage spark 
> components. One such use case is - embedding spark components like executor 
> and driver inside another process which may be a datastore. This allows 
> colocation of data and processing. Another requirement that stems from such a 
> use case is that the executors/driver should not take the parent process down 
> when they go down and the components can be relaunched inside the same 
> process again. 
> So, this JIRA requests two functionalities:
> 1. Support for external cluster managers
> 2. Allow a cluster manager to clean up the tasks without taking the parent 
> process down. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-13904) Add support for pluggable cluster manager

2016-04-17 Thread Hemant Bhanawat (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13904?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15245118#comment-15245118
 ] 

Hemant Bhanawat commented on SPARK-13904:
-

[~kiszk] I am looking into this. 

> Add support for pluggable cluster manager
> -
>
> Key: SPARK-13904
> URL: https://issues.apache.org/jira/browse/SPARK-13904
> Project: Spark
>  Issue Type: Improvement
>  Components: Scheduler
>    Reporter: Hemant Bhanawat
>
> Currently Spark allows only a few cluster managers viz Yarn, Mesos and 
> Standalone. But, as Spark is now being used in newer and different use cases, 
> there is a need for allowing other cluster managers to manage spark 
> components. One such use case is - embedding spark components like executor 
> and driver inside another process which may be a datastore. This allows 
> colocation of data and processing. Another requirement that stems from such a 
> use case is that the executors/driver should not take the parent process down 
> when they go down and the components can be relaunched inside the same 
> process again. 
> So, this JIRA requests two functionalities:
> 1. Support for external cluster managers
> 2. Allow a cluster manager to clean up the tasks without taking the parent 
> process down. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: How to process one partition at a time?

2016-04-06 Thread Hemant Bhanawat
Apparently, there is another way to do it. You can try creating a
PartitionPruningRDD and pass a partition filter function to it. This RDD
will do the same thing that I suggested in my mail and you will not have to
create a new RDD.

Hemant Bhanawat <https://www.linkedin.com/in/hemant-bhanawat-92a3811>
www.snappydata.io

On Wed, Apr 6, 2016 at 5:35 PM, Sun, Rui <rui@intel.com> wrote:

> Maybe you can try SparkContext.submitJob:
>
> *def **submitJob**[T, U, R](rdd: RDD
> <http://spark.apache.org/docs/latest/api/scala/org/apache/spark/rdd/RDD.html>[T],
>  processPartition:
> (Iterator[T]) **⇒** U, partitions: Seq[Int], resultHandler: (Int, U) **⇒** 
> Unit, resultFunc:
> **⇒** R): SimpleFutureAction
> <http://spark.apache.org/docs/latest/api/scala/org/apache/spark/SimpleFutureAction.html>[R]*
>
>
>
>
>
> *From:* Hemant Bhanawat [mailto:hemant9...@gmail.com]
> *Sent:* Wednesday, April 6, 2016 7:16 PM
> *To:* Andrei <faithlessfri...@gmail.com>
> *Cc:* user <user@spark.apache.org>
> *Subject:* Re: How to process one partition at a time?
>
>
>
> Instead of doing it in compute, you could rather override getPartitions
> method of your RDD and return only the target partitions. This way tasks
> for only target partitions will be created. Currently in your case, tasks
> for all the partitions are getting created.
>
> I hope it helps. I would like to hear if you take some other approach.
>
>
> Hemant Bhanawat <https://www.linkedin.com/in/hemant-bhanawat-92a3811>
>
> www.snappydata.io
>
>
>
> On Wed, Apr 6, 2016 at 3:49 PM, Andrei <faithlessfri...@gmail.com> wrote:
>
> I'm writing a kind of sampler which in most cases will require only 1
> partition, sometimes 2 and very rarely more. So it doesn't make sense to
> process all partitions in parallel. What is the easiest way to limit
> computations to one partition only?
>
>
>
> So far the best idea I came to is to create a custom partition whose
> `compute` method looks something like:
>
>
>
> def compute(split: Partition, context: TaskContext) = {
>
> if (split.index == targetPartition) {
>
> // do computation
>
> } else {
>
>// return empty iterator
>
> }
>
> }
>
>
>
>
>
> But it's quite ugly and I'm unlikely to be the first person with such a
> need. Is there easier way to do it?
>
>
>
>
>
>
>


Re: Executor shutdown hooks?

2016-04-06 Thread Hemant Bhanawat
As part of PR https://github.com/apache/spark/pull/11723, I have added a
killAllTasks function that can be used to kill (rather interrupt)
individual tasks before an executor exits. If this PR is accepted, for
doing task level cleanups, we can add a call to this function before
executor exits. The exit thread will wait for a certain period of time
before the executor jvm exits to allow proper cleanups of the tasks.

Hemant Bhanawat <https://www.linkedin.com/in/hemant-bhanawat-92a3811>
www.snappydata.io

On Thu, Apr 7, 2016 at 6:08 AM, Reynold Xin <r...@databricks.com> wrote:

>
> On Wed, Apr 6, 2016 at 4:39 PM, Sung Hwan Chung <coded...@cs.stanford.edu>
> wrote:
>
>> My option so far seems to be using JVM's shutdown hook, but I was
>> wondering if Spark itself had an API for tasks.
>>
>
> Spark would be using that under the hood anyway, so you might as well just
> use the jvm shutdown hook directly.
>
>


Re: Executor shutdown hooks?

2016-04-06 Thread Hemant Bhanawat
As part of PR https://github.com/apache/spark/pull/11723, I have added a
killAllTasks function that can be used to kill (rather interrupt)
individual tasks before an executor exits. If this PR is accepted, for
doing task level cleanups, we can add a call to this function before
executor exits. The exit thread will wait for a certain period of time
before the executor jvm exits to allow proper cleanups of the tasks.

Hemant Bhanawat <https://www.linkedin.com/in/hemant-bhanawat-92a3811>
www.snappydata.io

On Thu, Apr 7, 2016 at 6:08 AM, Reynold Xin <r...@databricks.com> wrote:

>
> On Wed, Apr 6, 2016 at 4:39 PM, Sung Hwan Chung <coded...@cs.stanford.edu>
> wrote:
>
>> My option so far seems to be using JVM's shutdown hook, but I was
>> wondering if Spark itself had an API for tasks.
>>
>
> Spark would be using that under the hood anyway, so you might as well just
> use the jvm shutdown hook directly.
>
>


Re: How to process one partition at a time?

2016-04-06 Thread Hemant Bhanawat
Instead of doing it in compute, you could rather override getPartitions
method of your RDD and return only the target partitions. This way tasks
for only target partitions will be created. Currently in your case, tasks
for all the partitions are getting created.

I hope it helps. I would like to hear if you take some other approach.


Hemant Bhanawat <https://www.linkedin.com/in/hemant-bhanawat-92a3811>
www.snappydata.io

On Wed, Apr 6, 2016 at 3:49 PM, Andrei <faithlessfri...@gmail.com> wrote:

> I'm writing a kind of sampler which in most cases will require only 1
> partition, sometimes 2 and very rarely more. So it doesn't make sense to
> process all partitions in parallel. What is the easiest way to limit
> computations to one partition only?
>
> So far the best idea I came to is to create a custom partition whose
> `compute` method looks something like:
>
> def compute(split: Partition, context: TaskContext) = {
> if (split.index == targetPartition) {
> // do computation
> } else {
>// return empty iterator
> }
> }
>
>
>
> But it's quite ugly and I'm unlikely to be the first person with such a
> need. Is there easier way to do it?
>
>
>


Re: how about a custom coalesce() policy?

2016-04-02 Thread Hemant Bhanawat
correcting email id for Nezih

Hemant Bhanawat <https://www.linkedin.com/in/hemant-bhanawat-92a3811>
www.snappydata.io

On Sun, Apr 3, 2016 at 11:09 AM, Hemant Bhanawat <hemant9...@gmail.com>
wrote:

> Hi Nezih,
>
> Can you share JIRA and PR numbers?
>
> This partial de-coupling of data partitioning strategy and spark
> parallelism would be a useful feature for any data store.
>
> Hemant
>
> Hemant Bhanawat <https://www.linkedin.com/in/hemant-bhanawat-92a3811>
> www.snappydata.io
>
> On Fri, Apr 1, 2016 at 10:33 PM, Nezih Yigitbasi <
> nyigitb...@netflix.com.invalid> wrote:
>
>> Hey Reynold,
>> Created an issue (and a PR) for this change to get discussions started.
>>
>> Thanks,
>> Nezih
>>
>> On Fri, Feb 26, 2016 at 12:03 AM Reynold Xin <r...@databricks.com> wrote:
>>
>>> Using the right email for Nezih
>>>
>>>
>>> On Fri, Feb 26, 2016 at 12:01 AM, Reynold Xin <r...@databricks.com>
>>> wrote:
>>>
>>>> I think this can be useful.
>>>>
>>>> The only thing is that we are slowly migrating to the Dataset/DataFrame
>>>> API, and leave RDD mostly as is as a lower level API. Maybe we should do
>>>> both? In either case it would be great to discuss the API on a pull
>>>> request. Cheers.
>>>>
>>>> On Wed, Feb 24, 2016 at 2:08 PM, Nezih Yigitbasi <
>>>> nyigitb...@netflix.com.invalid> wrote:
>>>>
>>>>> Hi Spark devs,
>>>>>
>>>>> I have sent an email about my problem some time ago where I want to
>>>>> merge a large number of small files with Spark. Currently I am using Hive
>>>>> with the CombineHiveInputFormat and I can control the size of the
>>>>> output files with the max split size parameter (which is used for
>>>>> coalescing the input splits by the CombineHiveInputFormat). My first
>>>>> attempt was to use coalesce(), but since coalesce only considers the
>>>>> target number of partitions the output file sizes were varying wildly.
>>>>>
>>>>> What I think can be useful is to have an optional PartitionCoalescer
>>>>> parameter (a new interface) in the coalesce() method (or maybe we can
>>>>> add a new method ?) that the callers can implement for custom coalescing
>>>>> strategies — for my use case I have already implemented a
>>>>> SizeBasedPartitionCoalescer that coalesces partitions by looking at
>>>>> their sizes and by using a max split size parameter, similar to the
>>>>> CombineHiveInputFormat (I also had to expose HadoopRDD to get access
>>>>> to the individual split sizes etc.).
>>>>>
>>>>> What do you guys think about such a change, can it be useful to other
>>>>> users as well? Or do you think that there is an easier way to accomplish
>>>>> the same merge logic? If you think it may be useful, I already have
>>>>> an implementation and I will be happy to work with the community to
>>>>> contribute it.
>>>>>
>>>>> Thanks,
>>>>> Nezih
>>>>> ​
>>>>>
>>>>
>>>>
>>>
>


Re: how about a custom coalesce() policy?

2016-04-02 Thread Hemant Bhanawat
Hi Nezih,

Can you share JIRA and PR numbers?

This partial de-coupling of data partitioning strategy and spark
parallelism would be a useful feature for any data store.

Hemant

Hemant Bhanawat <https://www.linkedin.com/in/hemant-bhanawat-92a3811>
www.snappydata.io

On Fri, Apr 1, 2016 at 10:33 PM, Nezih Yigitbasi <
nyigitb...@netflix.com.invalid> wrote:

> Hey Reynold,
> Created an issue (and a PR) for this change to get discussions started.
>
> Thanks,
> Nezih
>
> On Fri, Feb 26, 2016 at 12:03 AM Reynold Xin <r...@databricks.com> wrote:
>
>> Using the right email for Nezih
>>
>>
>> On Fri, Feb 26, 2016 at 12:01 AM, Reynold Xin <r...@databricks.com>
>> wrote:
>>
>>> I think this can be useful.
>>>
>>> The only thing is that we are slowly migrating to the Dataset/DataFrame
>>> API, and leave RDD mostly as is as a lower level API. Maybe we should do
>>> both? In either case it would be great to discuss the API on a pull
>>> request. Cheers.
>>>
>>> On Wed, Feb 24, 2016 at 2:08 PM, Nezih Yigitbasi <
>>> nyigitb...@netflix.com.invalid> wrote:
>>>
>>>> Hi Spark devs,
>>>>
>>>> I have sent an email about my problem some time ago where I want to
>>>> merge a large number of small files with Spark. Currently I am using Hive
>>>> with the CombineHiveInputFormat and I can control the size of the
>>>> output files with the max split size parameter (which is used for
>>>> coalescing the input splits by the CombineHiveInputFormat). My first
>>>> attempt was to use coalesce(), but since coalesce only considers the
>>>> target number of partitions the output file sizes were varying wildly.
>>>>
>>>> What I think can be useful is to have an optional PartitionCoalescer
>>>> parameter (a new interface) in the coalesce() method (or maybe we can
>>>> add a new method ?) that the callers can implement for custom coalescing
>>>> strategies — for my use case I have already implemented a
>>>> SizeBasedPartitionCoalescer that coalesces partitions by looking at
>>>> their sizes and by using a max split size parameter, similar to the
>>>> CombineHiveInputFormat (I also had to expose HadoopRDD to get access
>>>> to the individual split sizes etc.).
>>>>
>>>> What do you guys think about such a change, can it be useful to other
>>>> users as well? Or do you think that there is an easier way to accomplish
>>>> the same merge logic? If you think it may be useful, I already have an
>>>> implementation and I will be happy to work with the community to contribute
>>>> it.
>>>>
>>>> Thanks,
>>>> Nezih
>>>> ​
>>>>
>>>
>>>
>>


Re: SPARK-13900 - Join with simple OR conditions take too long

2016-04-01 Thread Hemant Bhanawat
As Mich has already noticed, Spark defaults to NL join if there are more
than one condition. Oracle is probably doing cost-based optimizations in
this scenario. You can call it a bug but in my opinion it is an area where
Spark is still evolving.

>> Hemant has mentioned the nested loop time will be very little.
I had mentioned that NL time will *vary *little with more number of
conditions.  What I meant was that instead of 3 conditions if you would
have 15 conditions, the NL loop would still take 13-15 mins while the hash
join would take more than that.

Hemant

Hemant Bhanawat <https://www.linkedin.com/in/hemant-bhanawat-92a3811>
www.snappydata.io

On Fri, Apr 1, 2016 at 3:08 PM, ashokkumar rajendran <
ashokkumar.rajend...@gmail.com> wrote:

> Hi Mich,
>
> Thanks for the input.
>
> Yes, it seems to be a bug. Is it possible to fix this in next release?
>
> Regards
> Ashok
>
> On Fri, Apr 1, 2016 at 2:06 PM, Mich Talebzadeh <mich.talebza...@gmail.com
> > wrote:
>
>> hm.
>>
>> Sounds like it ends up in Nested Loop Join (NLJ) as opposed to Hash Join
>> (HJ) when OR  is used for more than one predicate comparison.
>>
>> In below I have a table dummy created as ORC with 1 billion rows. Just
>> created another one called dummy1 with 60K rows
>>
>> A simple join results in Hash Join good!
>>
>> scala> sql("select d.id, d1.id from dummy d, dummy2 d1 where
>> d.random_string = d1.random_string").explain(true)
>>
>> == Physical Plan ==
>> Project [id#212,id#219]
>>
>> *+- BroadcastHashJoin [random_string#216], [random_string#223],
>> BuildRight*   :- ConvertToUnsafe
>>:  +- HiveTableScan [id#212,random_string#216], MetastoreRelation
>> test, dummy, Some(d)
>>+- ConvertToUnsafe
>>   +- HiveTableScan [id#219,random_string#223], MetastoreRelation
>> test, dummy2, Some(d1)
>>
>> When the join is done using OR on other predicates I see it starts doing
>> NLJ
>>
>> scala> sql("select d.id, d1.id from dummy d, dummy2 d1 where
>> d.random_string = d1.random_string OR d.small_vc =
>> d1.small_vc").explain(true)
>>
>> == Physical Plan ==
>> Project [id#241,id#248]
>> +- B*roadcastNestedLoopJoin *BuildRight, Inner, Some(((random_string#245
>> = random_string#252) || (small_vc#246 = small_vc#253)))
>>:- HiveTableScan [small_vc#246,id#241,random_string#245],
>> MetastoreRelation test, dummy, Some(d)
>>+- HiveTableScan [id#248,random_string#252,small_vc#253],
>> MetastoreRelation test, dummy2, Some(d1)
>>
>> in contrast the same identical tables in Oracle use Hash Join with OR
>> which is expected
>>
>> scratch...@mydb.mich.LOCAL> select d.id, d1.id from dummy d, dummy2 d1
>> where d.random_string = d1.random_string OR d.small_vc = d1.small_vc;
>>
>> Execution Plan
>> --
>> Plan hash value: 4163534687
>>
>> --
>> | Id  | Operation   | Name   | Rows  | Bytes |TempSpc| Cost
>> (%CPU)| Time |
>>
>> --
>> |   0 | SELECT STATEMENT|| 63207 |  8332K|   |  1280K
>> (1)| 04:16:05 |
>> |   1 |  CONCATENATION  ||   |   |   |
>> |  |
>> |*  2 |  * HASH JOIN *|| 60183 |  7934K|  4632K|   640K
>> (1)| 02:08:03 |
>> |   3 |TABLE ACCESS FULL| DUMMY2 | 6 |  3925K|   |   157
>> (1)| 00:00:02 |
>> |   4 |TABLE ACCESS FULL| DUMMY  |   100M|  6484M|   |   261K
>> (1)| 00:52:13 |
>> |*  5 |   *HASH JOIN *||  3024 |   398K|  4632K|   640K
>> (1)| 02:08:03 |
>> |   6 |TABLE ACCESS FULL| DUMMY2 | 6 |  3925K|   |   157
>> (1)| 00:00:02 |
>> |   7 |TABLE ACCESS FULL| DUMMY  |   100M|  6484M|   |   261K
>> (1)| 00:52:13 |
>>
>> --
>>
>> So this looks like a bug!
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 1 April 2016 at 04:53, ashokkumar rajendran <
>> ashokkumar.rajend...@gmail.com> wrote:
>>
>>>

Re: SPARK-13900 - Join with simple OR conditions take too long

2016-03-31 Thread Hemant Bhanawat
Hi Ashok,

That's interesting.

As I understand, on table A and B, a nested loop join (that will produce m
X n rows) is performed and than each row is evaluated to see if any of the
condition is met. You are asking that Spark should instead do a
BroadcastHashJoin on the equality conditions in parallel and then union the
results like you are doing in a different query.

If we leave aside parallelism for a moment, theoretically, time taken for
nested loop join would vary little when the number of conditions are
increased while the time taken for the solution that you are suggesting
would increase linearly with number of conditions. So, when number of
conditions are too many, nested loop join would be faster than the solution
that you suggest. Now the question is, how should Spark decide when to do
what?


Hemant Bhanawat <https://www.linkedin.com/in/hemant-bhanawat-92a3811>
www.snappydata.io

On Thu, Mar 31, 2016 at 2:28 PM, ashokkumar rajendran <
ashokkumar.rajend...@gmail.com> wrote:

> Hi,
>
> I have filed ticket SPARK-13900. There was an initial reply from a
> developer but did not get any reply on this. How can we do multiple hash
> joins together for OR conditions based joins? Could someone please guide on
> how can we fix this?
>
> Regards
> Ashok
>


[jira] [Created] (SPARK-13904) Add support for pluggable cluster manager

2016-03-15 Thread Hemant Bhanawat (JIRA)
Hemant Bhanawat created SPARK-13904:
---

 Summary: Add support for pluggable cluster manager
 Key: SPARK-13904
 URL: https://issues.apache.org/jira/browse/SPARK-13904
 Project: Spark
  Issue Type: Improvement
  Components: Scheduler
Reporter: Hemant Bhanawat


Currently Spark allows only a few cluster managers viz Yarn, Mesos and 
Standalone. But, as Spark is now being used in newer and different use cases, 
there is a need for allowing other cluster managers to manage spark components. 
One such use case is - embedding spark components like executor and driver 
inside another process which may be a datastore. This allows colocation of data 
and processing. Another requirement that stems from such a use case is that the 
executors/driver should not take the parent process down when they go down and 
the components can be relaunched inside the same process again. 

So, this JIRA requests two functionalities:
1. Support for external cluster managers
2. Allow a cluster manager to clean up the tasks without taking the parent 
process down. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



Re: Can we use spark inside a web service?

2016-03-11 Thread Hemant Bhanawat
Spark-jobserver is an elegant product that builds concurrency on top of
Spark. But, the current design of DAGScheduler prevents Spark to become a
truly concurrent solution for low latency queries. DagScheduler will turn
out to be a bottleneck for low latency queries. Sparrow project was an
effort to make Spark more suitable for such scenarios but it never made it
to the Spark codebase. If Spark has to become a highly concurrent solution,
scheduling has to be distributed.

Hemant Bhanawat <https://www.linkedin.com/in/hemant-bhanawat-92a3811>
www.snappydata.io

On Fri, Mar 11, 2016 at 7:02 AM, Chris Fregly <ch...@fregly.com> wrote:

> great discussion, indeed.
>
> Mark Hamstra and i spoke offline just now.
>
> Below is a quick recap of our discussion on how they've achieved
> acceptable performance from Spark on the user request/response path (@mark-
> feel free to correct/comment).
>
> 1) there is a big difference in request/response latency between
> submitting a full Spark Application (heavy weight) versus having a
> long-running Spark Application (like Spark Job Server) that submits
> lighter-weight Jobs using a shared SparkContext.  mark is obviously using
> the latter - a long-running Spark App.
>
> 2) there are some enhancements to Spark that are required to achieve
> acceptable user request/response times.  some links that Mark provided are
> as follows:
>
>- https://issues.apache.org/jira/browse/SPARK-11838
>- https://github.com/apache/spark/pull/11036
>- https://github.com/apache/spark/pull/11403
>- https://issues.apache.org/jira/browse/SPARK-13523
>- https://issues.apache.org/jira/browse/SPARK-13756
>
> Essentially, a deeper level of caching at the shuffle file layer to reduce
> compute and memory between queries.
>
> Note that Mark is running a slightly-modified version of stock Spark.
>  (He's mentioned this in prior posts, as well.)
>
> And I have to say that I'm, personally, seeing more and more
> slightly-modified versions of Spark being deployed to production to
> workaround outstanding PR's and Jiras.
>
> this may not be what people want to hear, but it's a trend that i'm seeing
> lately as more and more customize Spark to their specific use cases.
>
> Anyway, thanks for the good discussion, everyone!  This is why we have
> these lists, right!  :)
>
>
> On Thu, Mar 10, 2016 at 7:51 PM, Evan Chan <velvia.git...@gmail.com>
> wrote:
>
>> One of the premises here is that if you can restrict your workload to
>> fewer cores - which is easier with FiloDB and careful data modeling -
>> you can make this work for much higher concurrency and lower latency
>> than most typical Spark use cases.
>>
>> The reason why it typically does not work in production is that most
>> people are using HDFS and files.  These data sources are designed for
>> running queries and workloads on all your cores across many workers,
>> and not for filtering your workload down to only one or two cores.
>>
>> There is actually nothing inherent in Spark that prevents people from
>> using it as an app server.   However, the insistence on using it with
>> HDFS is what kills concurrency.   This is why FiloDB is important.
>>
>> I agree there are more optimized stacks for running app servers, but
>> the choices that you mentioned:  ES is targeted at text search;  Cass
>> and HBase by themselves are not fast enough for analytical queries
>> that the OP wants;  and MySQL is great but not scalable.   Probably
>> something like VectorWise, HANA, Vertica would work well, but those
>> are mostly not free solutions.   Druid could work too if the use case
>> is right.
>>
>> Anyways, great discussion!
>>
>> On Thu, Mar 10, 2016 at 2:46 PM, Chris Fregly <ch...@fregly.com> wrote:
>> > you are correct, mark.  i misspoke.  apologies for the confusion.
>> >
>> > so the problem is even worse given that a typical job requires multiple
>> > tasks/cores.
>> >
>> > i have yet to see this particular architecture work in production.  i
>> would
>> > love for someone to prove otherwise.
>> >
>> > On Thu, Mar 10, 2016 at 5:44 PM, Mark Hamstra <m...@clearstorydata.com>
>> > wrote:
>> >>>
>> >>> For example, if you're looking to scale out to 1000 concurrent
>> requests,
>> >>> this is 1000 concurrent Spark jobs.  This would require a cluster
>> with 1000
>> >>> cores.
>> >>
>> >>
>> >> This doesn't make sense.  A Spark Job is a driver/DAGScheduler concept
>> >> without any 1:1 correspondence between Worker cores and J

Re: S3 Zip File Loading Advice

2016-03-08 Thread Hemant Bhanawat
https://issues.apache.org/jira/browse/SPARK-3586 talks about creating a
file dstream which can monitor for new files recursively but this
functionality is not yet added.

I don't see an easy way out. You will have to create your folders based on
timeline (looks like you are already doing that) and running a new job over
the new folders created in an interval.  This will have to be an automated
using an external script.

Hemant Bhanawat <https://www.linkedin.com/in/hemant-bhanawat-92a3811>
www.snappydata.io

On Wed, Mar 9, 2016 at 10:33 AM, Benjamin Kim <bbuil...@gmail.com> wrote:

> I am wondering if anyone can help.
>
> Our company stores zipped CSV files in S3, which has been a big headache
> from the start. I was wondering if anyone has created a way to iterate
> through several subdirectories (s3n://events/2016/03/01/00,
> s3n://2016/03/01/01, etc.) in S3 to find the newest files and load them. It
> would be a big bonus to include the unzipping of the file in the process so
> that the CSV can be loaded directly into a dataframe for further
> processing. I’m pretty sure that the S3 part of this request is not
> uncommon. I would think the file being zipped is uncommon. If anyone can
> help, I would truly be grateful for I am new to Scala and Spark. This would
> be a great help in learning.
>
> Thanks,
> Ben
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark Streaming - graceful shutdown when stream has no more data

2016-02-23 Thread Hemant Bhanawat
A guess - parseRecord is returning None in some case (probaly empty lines).
And then entry.get is throwing the exception.

You may want to filter the None values from accessLogDStream before you run
the map function over it.

Hemant

Hemant Bhanawat <https://www.linkedin.com/in/hemant-bhanawat-92a3811>
www.snappydata.io

On Tue, Feb 23, 2016 at 6:00 PM, Ted Yu <yuzhih...@gmail.com> wrote:

> Which line is line 42 in your code ?
>
> When variable lines becomes empty, you can stop your program.
>
> Cheers
>
> On Feb 23, 2016, at 12:25 AM, Femi Anthony <femib...@gmail.com> wrote:
>
> I am working on Spark Streaming API and I wish to stream a set of
> pre-downloaded web log files continuously to simulate a real-time stream. I
> wrote a script that gunzips the compressed logs and pipes the output to nc
> on port .
>
> The script looks like this:
>
> BASEDIR=/home/mysuer/data/datamining/internet_traffic_archive
> zipped_files=`find $BASEDIR -name "*.gz"`
>
> for zfile in $zipped_files
>  do
>   echo "Unzipping $zfile..."
>   gunzip -c $zfile  | nc -l -p  -q 20
>
>  done
>
> I have streaming code written in Scala that processes the streams. It
> works well for the most part, but when its run out of files to stream I get
> the following error in Spark:
>
> 16/02/19 23:04:35 WARN ReceiverSupervisorImpl:
> Restarting receiver with delay 2000 ms: Socket data stream had no more data
> 16/02/19 23:04:35 ERROR ReceiverTracker: Deregistered receiver for stream 0:
> Restarting receiver with delay 2000ms: Socket data stream had no more data
> 16/02/19 23:04:35 WARN BlockManager: Block input-0-1455941075600 replicated 
> to only 0 peer(s) instead of 1 peers
> 
> 16/02/19 23:04:40 ERROR Executor: Exception in task 2.0 in stage 15.0 (TID 47)
> java.util.NoSuchElementException: None.get
> at scala.None$.get(Option.scala:313)
> at scala.None$.get(Option.scala:311)
> at 
> com.femibyte.learningsparkaddexamples.scala.StreamingLogEnhanced$$anonfun$2.apply(StreamingLogEnhanced.scala:42)
> at 
> com.femibyte.learningsparkaddexamples.scala.StreamingLogEnhanced$$anonfun$2.apply(StreamingLogEnhanced.scala:42)
>
> How to I implement a graceful shutdown so that the program exits
> gracefully when it no longer detects any data in the stream ?
>
> My Spark Streaming code looks like this:
>
> object StreamingLogEnhanced {
>  def main(args: Array[String]) {
>   val master = args(0)
>   val conf = new
>  SparkConf().setMaster(master).setAppName("StreamingLogEnhanced")
>  // Create a StreamingContext with a n second batch size
>   val ssc = new StreamingContext(conf, Seconds(10))
>  // Create a DStream from all the input on port 
>   val log = Logger.getLogger(getClass.getName)
>
>   sys.ShutdownHookThread {
>   log.info("Gracefully stopping Spark Streaming Application")
>   ssc.stop(true, true)
>   log.info("Application stopped")
>   }
>   val lines = ssc.socketTextStream("localhost", )
>   // Create a count of log hits by ip
>   var ipCounts=countByIp(lines)
>   ipCounts.print()
>
>   // start our streaming context and wait for it to "finish"
>   ssc.start()
>   // Wait for 600 seconds then exit
>   ssc.awaitTermination(1*600)
>   ssc.stop()
>   }
>
>  def countByIp(lines: DStream[String]) = {
>val parser = new AccessLogParser
>val accessLogDStream = lines.map(line => parser.parseRecord(line))
>val ipDStream = accessLogDStream.map(entry =>
> (entry.get.clientIpAddress, 1))
>ipDStream.reduceByKey((x, y) => x + y)
>  }
>
> }
>
> Thanks for any suggestions in advance.
>
>


Re: Specify number of executors in standalone cluster mode

2016-02-21 Thread Hemant Bhanawat
Max number of cores per executor can be controlled using
spark.executor.cores. And maximum number of executors on a single worker
can be determined by environment variable: SPARK_WORKER_INSTANCES.

However, to ensure that all available cores are used, you will have to take
care of how the stream is partitioned. Copy pasting help text of Spark.



*The number of tasks per receiver per batch will be approximately (batch
interval / block interval). For example, block interval of 200 ms will
create 10 tasks per 2 second batches. If the number of tasks is too low
(that is, less than the number of cores per machine), then it will be
inefficient as all available cores will not be used to process the data. To
increase the number of tasks for a given batch interval, reduce the block
interval. However, the recommended minimum value of block interval is about
50 ms, below which the task launching overheads may be a problem.An
alternative to receiving data with multiple input streams / receivers is to
explicitly repartition the input data stream (using
inputStream.repartition()). This distributes the
received batches of data across the specified number of machines in the
cluster before further processing.*

Hemant Bhanawat <https://www.linkedin.com/in/hemant-bhanawat-92a3811>
www.snappydata.io

On Sun, Feb 21, 2016 at 11:01 PM, Saiph Kappa <saiph.ka...@gmail.com> wrote:

> Hi,
>
> I'm running a spark streaming application onto a spark cluster that spans
> 6 machines/workers. I'm using spark cluster standalone mode. Each machine
> has 8 cores. Is there any way to specify that I want to run my application
> on all 6 machines and just use 2 cores on each machine?
>
> Thanks
>


Re: Behind the scene of RDD to DataFrame

2016-02-20 Thread Hemant Bhanawat
toDF internally calls sqlcontext.createDataFrame which transforms the RDD
to RDD[InternalRow]. This RDD[InternalRow] is then mapped to a dataframe.

Type conversions (from scala types to catalyst types) are involved but no
shuffling.

Hemant Bhanawat <https://www.linkedin.com/in/hemant-bhanawat-92a3811>
www.snappydata.io

On Sun, Feb 21, 2016 at 11:48 AM, Weiwei Zhang <wzhan...@dons.usfca.edu>
wrote:

> Hi there,
>
> Could someone explain to me what is behind the scene of rdd.toDF()? More
> importantly, will this step involve a lot of shuffles and cause the surge
> of the size of intermediate files? Thank you.
>
> Best Regards,
> Vivian
>


Re: spark stages in parallel

2016-02-20 Thread Hemant Bhanawat
Not possible as of today. See
https://issues.apache.org/jira/browse/SPARK-2387

Hemant Bhanawat
https://www.linkedin.com/in/hemant-bhanawat-92a3811
www.snappydata.io

On Thu, Feb 18, 2016 at 1:19 PM, Shushant Arora <shushantaror...@gmail.com>
wrote:

> can two stages of single job run in parallel in spark?
>
> e.g one stage is ,map transformation and another is repartition on mapped
> rdd.
>
> rdd.map(function,100).repartition(30);
>
> can it happen that map transformation which is running 100 tasks after few
> of them say (10 )  are finished and spark started another stage repartition
> which started copying data from mapped stage nodes in parallel.
>
> Thanks
>


Re: Optimal way to re-partition from a single partition

2016-02-09 Thread Hemant Bhanawat
For sql shuffle operations like groupby, the number of output partitions is
controlled by spark.sql.shuffle.partitions. But, it seems orderBy does not
honour this.

In my small test, I could see that the number of partitions  in DF returned
by orderBy was equal to the total number of distinct keys. Are you
observing the same, I mean do you have a single value for all rows in the
column on which you are running orderBy? If yes, you are better off not
running the orderBy clause.

May be someone from spark sql team could answer that how should the
partitioning of the output DF be handled when doing an orderBy?

Hemant
www.snappydata.io
https://github.com/SnappyDataInc/snappydata




On Tue, Feb 9, 2016 at 4:00 AM, Cesar Flores  wrote:

>
> I have a data frame which I sort using orderBy function. This operation
> causes my data frame to go to a single partition. After using those
> results, I would like to re-partition to a larger number of partitions.
> Currently I am just doing:
>
> val rdd = df.rdd.coalesce(100, true) //df is a dataframe with a single
> partition and around 14 million records
> val newDF =  hc.createDataFrame(rdd, df.schema)
>
> This process is really slow. Is there any other way of achieving this
> task, or to optimize it (perhaps tweaking a spark configuration parameter)?
>
>
> Thanks a lot
> --
> Cesar Flores
>


Re: Optimal way to re-partition from a single partition

2016-02-09 Thread Hemant Bhanawat
Ohk. I was comparing groupBy with orderBy and now I realize that they are
using different partitioning schemes.

Thanks Takeshi.



On Tue, Feb 9, 2016 at 9:09 PM, Takeshi Yamamuro <linguin@gmail.com>
wrote:

> Hi,
>
> DataFrame#sort() uses `RangePartitioning` in `Exchange` instead of
> `HashPartitioning`.
> `RangePartitioning` roughly samples input data and internally computes
> partition bounds
> to split given rows into `spark.sql.shuffle.partitions` partitions.
> Therefore, when sort keys are highly skewed, I think some partitions could
> end up being empty
> (that is, # of result partitions is lower than `spark.sql.shuffle.partitions`
> .
>
>
> On Tue, Feb 9, 2016 at 9:35 PM, Hemant Bhanawat <hemant9...@gmail.com>
> wrote:
>
>> For sql shuffle operations like groupby, the number of output partitions
>> is controlled by spark.sql.shuffle.partitions. But, it seems orderBy does
>> not honour this.
>>
>> In my small test, I could see that the number of partitions  in DF
>> returned by orderBy was equal to the total number of distinct keys. Are you
>> observing the same, I mean do you have a single value for all rows in the
>> column on which you are running orderBy? If yes, you are better off not
>> running the orderBy clause.
>>
>> May be someone from spark sql team could answer that how should the
>> partitioning of the output DF be handled when doing an orderBy?
>>
>> Hemant
>> www.snappydata.io
>> https://github.com/SnappyDataInc/snappydata
>>
>>
>>
>>
>> On Tue, Feb 9, 2016 at 4:00 AM, Cesar Flores <ces...@gmail.com> wrote:
>>
>>>
>>> I have a data frame which I sort using orderBy function. This operation
>>> causes my data frame to go to a single partition. After using those
>>> results, I would like to re-partition to a larger number of partitions.
>>> Currently I am just doing:
>>>
>>> val rdd = df.rdd.coalesce(100, true) //df is a dataframe with a single
>>> partition and around 14 million records
>>> val newDF =  hc.createDataFrame(rdd, df.schema)
>>>
>>> This process is really slow. Is there any other way of achieving this
>>> task, or to optimize it (perhaps tweaking a spark configuration parameter)?
>>>
>>>
>>> Thanks a lot
>>> --
>>> Cesar Flores
>>>
>>
>>
>
>
> --
> ---
> Takeshi Yamamuro
>


Re: Spark Streaming with Druid?

2016-02-08 Thread Hemant Bhanawat
SnappyData's deployment is different that how Spark is deployed. See
http://snappydatainc.github.io/snappydata/deployment/ and
http://snappydatainc.github.io/snappydata/jobs/.

For further questions, you can join us on stackoverflow
http://stackoverflow.com/questions/tagged/snappydata.

Hemant


On Mon, Feb 8, 2016 at 10:04 PM, Umesh Kacha <umesh.ka...@gmail.com> wrote:

> Hi Hemant, thanks much can we use SnappyData on YARN. My Spark jobs run
> using yarn client mode. Please guide.
>
> On Mon, Feb 8, 2016 at 9:46 AM, Hemant Bhanawat <hemant9...@gmail.com>
> wrote:
>
>> You may want to have a look at spark druid project already in progress:
>> https://github.com/SparklineData/spark-druid-olap
>>
>> You can also have a look at SnappyData
>> <https://github.com/SnappyDataInc/snappydata>, which is a low latency
>> store tightly integrated with Spark, Spark SQL and Spark Streaming. You can
>> find the 0.1 Preview release's documentation here.
>> <http://snappydatainc.github.io/snappydata/>
>>
>> Disclaimer: I am a SnappyData engineer.
>>
>> Hemant
>> www.snappydata.io
>>
>>
>> On Sun, Feb 7, 2016 at 12:47 AM, unk1102 <umesh.ka...@gmail.com> wrote:
>>
>>> Hi did anybody tried Spark Streaming with Druid as low latency store?
>>> Combination seems powerful is it worth trying both together? Please guide
>>> and share your experience. I am after creating the best low latency
>>> streaming analytics.
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-with-Druid-tp26164.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Re: Spark Streaming with Druid?

2016-02-07 Thread Hemant Bhanawat
You may want to have a look at spark druid project already in progress:
https://github.com/SparklineData/spark-druid-olap

You can also have a look at SnappyData
, which is a low latency store
tightly integrated with Spark, Spark SQL and Spark Streaming. You can find
the 0.1 Preview release's documentation here.


Disclaimer: I am a SnappyData engineer.

Hemant
www.snappydata.io


On Sun, Feb 7, 2016 at 12:47 AM, unk1102  wrote:

> Hi did anybody tried Spark Streaming with Druid as low latency store?
> Combination seems powerful is it worth trying both together? Please guide
> and share your experience. I am after creating the best low latency
> streaming analytics.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-with-Druid-tp26164.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: DataFrame First method is resulting different results in each iteration

2016-02-03 Thread Hemant Bhanawat
Missing order by?

Hemant Bhanawat
SnappyData (http://snappydata.io/)

On Wed, Feb 3, 2016 at 3:45 PM, satish chandra j <jsatishchan...@gmail.com>
wrote:

> HI All,
> I have data in a emp_df (DataFrame) as mentioned below:
>
> EmpId   Sal   DeptNo
> 001   100   10
> 002   120   20
> 003   130   10
> 004   140   20
> 005   150   10
>
> ordrd_emp_df = emp_df.orderBy($"DeptNo",$"Sal".desc)  which results as
> below:
>
> DeptNo  Sal   EmpId
> 10 150   005
> 10 130   003
> 10 100   001
> 20 140   004
> 20 120   002
>
> Now I want to pick highest paid EmpId of each DeptNo.,hence applied agg
> First method as below
>
>
> ordrd_emp_df.groupBy("DeptNo").agg($"DeptNo",first("EmpId").as("TopSal")).select($"DeptNo",$"TopSal")
>
> Expected output is DeptNo  TopSal
>   10005
>20   004
> But my output varies for each iteration such as
>
> First Iteration results as  Dept  TopSal
>   10 003
>20 004
>
> Secnd Iteration results as Dept  TopSal
>   10 005
>   20 004
>
> Third Iteration results as  Dept  TopSal
>   10 003
>   20 002
>
> Not sure why output varies on each iteration as no change in code and
> values in DataFrame
>
> Please let me know if any inputs on this
>
> Regards,
> Satish Chandra J
>


Re: DataFrame First method is resulting different results in each iteration

2016-02-03 Thread Hemant Bhanawat
Ahh.. missed that.

I see that you have used "first" function. 'first' returns the first row it
has found. On a single executor it may return the right results. But, on
multiple executors, it will return the first row of any of the executor
which may not be the first row when the results are combined.

I believe, if you change your query like this, you will get the right
results:

ordrd_emp_df.groupBy("DeptNo").
agg($"DeptNo", max("Sal").as("HighestSal"))

But as you can see, you get the highest Sal and not the EmpId with highest
Sal. For getting EmpId with highest Sal, you will have to change your query
to add filters or add subqueries. See the following thread:

http://stackoverflow.com/questions/6841605/get-top-1-row-of-each-group

Hemant Bhanawat
SnappyData (http://snappydata.io/)


On Wed, Feb 3, 2016 at 4:33 PM, satish chandra j <jsatishchan...@gmail.com>
wrote:

> Hi Hemant,
> My dataframe "ordrd_emd_df" consist data in order as I have applied oderBy
> in the first step
> And also tried having "orderBy" method before "groupBy" than also getting
> different results in each iteration
>
> Regards,
> Satish Chandra
>
>
> On Wed, Feb 3, 2016 at 4:28 PM, Hemant Bhanawat <hemant9...@gmail.com>
> wrote:
>
>> Missing order by?
>>
>> Hemant Bhanawat
>> SnappyData (http://snappydata.io/)
>>
>>
>> On Wed, Feb 3, 2016 at 3:45 PM, satish chandra j <
>> jsatishchan...@gmail.com> wrote:
>>
>>> HI All,
>>> I have data in a emp_df (DataFrame) as mentioned below:
>>>
>>> EmpId   Sal   DeptNo
>>> 001   100   10
>>> 002   120   20
>>> 003   130   10
>>> 004   140   20
>>> 005   150   10
>>>
>>> ordrd_emp_df = emp_df.orderBy($"DeptNo",$"Sal".desc)  which results as
>>> below:
>>>
>>> DeptNo  Sal   EmpId
>>> 10 150   005
>>> 10 130   003
>>> 10 100   001
>>> 20 140   004
>>> 20 120   002
>>>
>>> Now I want to pick highest paid EmpId of each DeptNo.,hence applied agg
>>> First method as below
>>>
>>>
>>> ordrd_emp_df.groupBy("DeptNo").agg($"DeptNo",first("EmpId").as("TopSal")).select($"DeptNo",$"TopSal")
>>>
>>> Expected output is DeptNo  TopSal
>>>   10005
>>>20   004
>>> But my output varies for each iteration such as
>>>
>>> First Iteration results as  Dept  TopSal
>>>   10 003
>>>20 004
>>>
>>> Secnd Iteration results as Dept  TopSal
>>>   10 005
>>>   20 004
>>>
>>> Third Iteration results as  Dept  TopSal
>>>   10 003
>>>   20 002
>>>
>>> Not sure why output varies on each iteration as no change in code and
>>> values in DataFrame
>>>
>>> Please let me know if any inputs on this
>>>
>>> Regards,
>>> Satish Chandra J
>>>
>>
>>
>


Re: GenericMutableRow and Row mismatch on Spark 1.5?

2015-10-07 Thread Hemant Bhanawat
Please find attached.



On Wed, Oct 7, 2015 at 7:36 PM, Ted Yu <yuzhih...@gmail.com> wrote:

> Hemant:
> Can you post the code snippet to the mailing list - other people would be
> interested.
>
> On Wed, Oct 7, 2015 at 5:50 AM, Hemant Bhanawat <hemant9...@gmail.com>
> wrote:
>
>> Will send you the code on your email id.
>>
>> On Wed, Oct 7, 2015 at 4:37 PM, Ophir Cohen <oph...@gmail.com> wrote:
>>
>>> Thanks!
>>> Can you check if you can provide example of the conversion?
>>>
>>>
>>> On Wed, Oct 7, 2015 at 2:05 PM, Hemant Bhanawat <hemant9...@gmail.com>
>>> wrote:
>>>
>>>> Oh, this is an internal class of our project and I had used it without
>>>> realizing the source.
>>>>
>>>> Anyway, the idea is to  wrap the InternalRow in a class that derives
>>>> from Row. When you implement the functions of the trait 'Row ', the type
>>>> conversions from Row types to InternalRow types has to be done for each of
>>>> the types. But, as I can see, the primitive types (apart from String) don't
>>>> need conversions. Map and Array would need some handling.
>>>>
>>>> I will check with the author of this code, I think this code can be
>>>> contributed to Spark.
>>>>
>>>> Hemant
>>>> www.snappydata.io
>>>> linkedin.com/company/snappydata
>>>>
>>>> On Wed, Oct 7, 2015 at 3:30 PM, Ophir Cohen <oph...@gmail.com> wrote:
>>>>
>>>>> From which jar WrappedInternalRow comes from?
>>>>> It seems that I can't find it.
>>>>>
>>>>> BTW
>>>>> What I'm trying to do now is to create scala array from the fields and
>>>>> than create Row out of that array.
>>>>> The problem is that I get types mismatches...
>>>>>
>>>>> On Wed, Oct 7, 2015 at 8:03 AM, Hemant Bhanawat <hemant9...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> An approach can be to wrap your MutableRow in WrappedInternalRow
>>>>>> which is a child class of Row.
>>>>>>
>>>>>> Hemant
>>>>>> www.snappydata.io
>>>>>> linkedin.com/company/snappydata
>>>>>>
>>>>>>
>>>>>> On Tue, Oct 6, 2015 at 3:21 PM, Ophir Cohen <oph...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Guys,
>>>>>>> I'm upgrading to Spark 1.5.
>>>>>>>
>>>>>>> In our previous version (Spark 1.3 but it was OK on 1.4 as well) we
>>>>>>> created GenericMutableRow
>>>>>>> (org.apache.spark.sql.catalyst.expressions.GenericMutableRow) and 
>>>>>>> return it
>>>>>>> as org.apache.spark.sql.Row
>>>>>>>
>>>>>>> Starting from Spark 1.5 GenericMutableRow isn't extends Row.
>>>>>>>
>>>>>>> What do you suggest to do?
>>>>>>> How can I convert GenericMutableRow to Row?
>>>>>>>
>>>>>>> Prompt answer will be highly appreciated!
>>>>>>> Thanks,
>>>>>>> Ophir
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.spark.sql.collection

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.types.StructType
import org.apache.spark.unsafe.types.UTF8String

/**
 * Wraps an `InternalRow` to expose a `Row`
 */
final class WrappedInternalRow(override val schema: StructType,
val converters: Array[(InternalRow, Int)

Re: GenericMutableRow and Row mismatch on Spark 1.5?

2015-10-07 Thread Hemant Bhanawat
Oh, this is an internal class of our project and I had used it without
realizing the source.

Anyway, the idea is to  wrap the InternalRow in a class that derives from
Row. When you implement the functions of the trait 'Row ', the type
conversions from Row types to InternalRow types has to be done for each of
the types. But, as I can see, the primitive types (apart from String) don't
need conversions. Map and Array would need some handling.

I will check with the author of this code, I think this code can be
contributed to Spark.

Hemant
www.snappydata.io
linkedin.com/company/snappydata

On Wed, Oct 7, 2015 at 3:30 PM, Ophir Cohen <oph...@gmail.com> wrote:

> From which jar WrappedInternalRow comes from?
> It seems that I can't find it.
>
> BTW
> What I'm trying to do now is to create scala array from the fields and
> than create Row out of that array.
> The problem is that I get types mismatches...
>
> On Wed, Oct 7, 2015 at 8:03 AM, Hemant Bhanawat <hemant9...@gmail.com>
> wrote:
>
>> An approach can be to wrap your MutableRow in WrappedInternalRow which is
>> a child class of Row.
>>
>> Hemant
>> www.snappydata.io
>> linkedin.com/company/snappydata
>>
>>
>> On Tue, Oct 6, 2015 at 3:21 PM, Ophir Cohen <oph...@gmail.com> wrote:
>>
>>> Hi Guys,
>>> I'm upgrading to Spark 1.5.
>>>
>>> In our previous version (Spark 1.3 but it was OK on 1.4 as well) we
>>> created GenericMutableRow
>>> (org.apache.spark.sql.catalyst.expressions.GenericMutableRow) and return it
>>> as org.apache.spark.sql.Row
>>>
>>> Starting from Spark 1.5 GenericMutableRow isn't extends Row.
>>>
>>> What do you suggest to do?
>>> How can I convert GenericMutableRow to Row?
>>>
>>> Prompt answer will be highly appreciated!
>>> Thanks,
>>> Ophir
>>>
>>
>>
>


Re: GenericMutableRow and Row mismatch on Spark 1.5?

2015-10-07 Thread Hemant Bhanawat
Will send you the code on your email id.

On Wed, Oct 7, 2015 at 4:37 PM, Ophir Cohen <oph...@gmail.com> wrote:

> Thanks!
> Can you check if you can provide example of the conversion?
>
>
> On Wed, Oct 7, 2015 at 2:05 PM, Hemant Bhanawat <hemant9...@gmail.com>
> wrote:
>
>> Oh, this is an internal class of our project and I had used it without
>> realizing the source.
>>
>> Anyway, the idea is to  wrap the InternalRow in a class that derives from
>> Row. When you implement the functions of the trait 'Row ', the type
>> conversions from Row types to InternalRow types has to be done for each of
>> the types. But, as I can see, the primitive types (apart from String) don't
>> need conversions. Map and Array would need some handling.
>>
>> I will check with the author of this code, I think this code can be
>> contributed to Spark.
>>
>> Hemant
>> www.snappydata.io
>> linkedin.com/company/snappydata
>>
>> On Wed, Oct 7, 2015 at 3:30 PM, Ophir Cohen <oph...@gmail.com> wrote:
>>
>>> From which jar WrappedInternalRow comes from?
>>> It seems that I can't find it.
>>>
>>> BTW
>>> What I'm trying to do now is to create scala array from the fields and
>>> than create Row out of that array.
>>> The problem is that I get types mismatches...
>>>
>>> On Wed, Oct 7, 2015 at 8:03 AM, Hemant Bhanawat <hemant9...@gmail.com>
>>> wrote:
>>>
>>>> An approach can be to wrap your MutableRow in WrappedInternalRow which
>>>> is a child class of Row.
>>>>
>>>> Hemant
>>>> www.snappydata.io
>>>> linkedin.com/company/snappydata
>>>>
>>>>
>>>> On Tue, Oct 6, 2015 at 3:21 PM, Ophir Cohen <oph...@gmail.com> wrote:
>>>>
>>>>> Hi Guys,
>>>>> I'm upgrading to Spark 1.5.
>>>>>
>>>>> In our previous version (Spark 1.3 but it was OK on 1.4 as well) we
>>>>> created GenericMutableRow
>>>>> (org.apache.spark.sql.catalyst.expressions.GenericMutableRow) and return 
>>>>> it
>>>>> as org.apache.spark.sql.Row
>>>>>
>>>>> Starting from Spark 1.5 GenericMutableRow isn't extends Row.
>>>>>
>>>>> What do you suggest to do?
>>>>> How can I convert GenericMutableRow to Row?
>>>>>
>>>>> Prompt answer will be highly appreciated!
>>>>> Thanks,
>>>>> Ophir
>>>>>
>>>>
>>>>
>>>
>>
>


Re: GenericMutableRow and Row mismatch on Spark 1.5?

2015-10-06 Thread Hemant Bhanawat
An approach can be to wrap your MutableRow in WrappedInternalRow which is a
child class of Row.

Hemant
www.snappydata.io
linkedin.com/company/snappydata


On Tue, Oct 6, 2015 at 3:21 PM, Ophir Cohen  wrote:

> Hi Guys,
> I'm upgrading to Spark 1.5.
>
> In our previous version (Spark 1.3 but it was OK on 1.4 as well) we
> created GenericMutableRow
> (org.apache.spark.sql.catalyst.expressions.GenericMutableRow) and return it
> as org.apache.spark.sql.Row
>
> Starting from Spark 1.5 GenericMutableRow isn't extends Row.
>
> What do you suggest to do?
> How can I convert GenericMutableRow to Row?
>
> Prompt answer will be highly appreciated!
> Thanks,
> Ophir
>


Re: [cache eviction] partition recomputation in big lineage RDDs

2015-10-01 Thread Hemant Bhanawat
As I understand, you don't need merge of  your historical data RDD with
your RDD_inc, what you need is merge of the computation results of the your
historical RDD with RDD_inc and so on.

IMO, you should consider having an external row store to hold your
computations. I say this because you need to update the rows of prior
computation based on the new data. Spark cached batches are column oriented
and any update to a spark cached batch is a costly op.


On Wed, Sep 30, 2015 at 10:59 PM, Nicolae Marasoiu <
nicolae.maras...@adswizz.com> wrote:

> Hi,
>
>
> An equivalent question would be: can the memory cache be selectively
> evicted from within a component run in the driver? I know it is breaking
> some abstraction/encapsulation, but clearly I need to evict part of the
> cache so that it is reloaded with newer values from DB.
>
>
> Because what I basically need is invalidating some portions of the data
> which have newer values. The "compute" method should be the same (read with
> TableInputFormat).
>
> Thanks
> Nicu
> --
> *From:* Nicolae Marasoiu 
> *Sent:* Wednesday, September 30, 2015 4:07 PM
> *To:* user@spark.apache.org
> *Subject:* Re: partition recomputation in big lineage RDDs
>
>
> Hi,
>
> In fact, my RDD will get a new version (a new RDD assigned to the same
> var) quite frequently, by merging bulks of 1000 events of events of last
> 10s.
>
> But recomputation would be more efficient to do not by reading initial RDD
> partition(s) and reapplying deltas, but by reading from HBase the latest
> data, and just compute on top of that if anything.
>
> Basically I guess I need to write my own RDD and implement compute method
> by sliding on hbase.
>
> Thanks,
> Nicu
> --
> *From:* Nicolae Marasoiu 
> *Sent:* Wednesday, September 30, 2015 3:05 PM
> *To:* user@spark.apache.org
> *Subject:* partition recomputation in big lineage RDDs
>
>
> Hi,
>
>
> If I implement a manner to have an up-to-date version of my RDD by
> ingesting some new events, called RDD_inc (from increment), and I provide a
> "merge" function m(RDD, RDD_inc), which returns the RDD_new, it looks like
> I can evolve the state of my RDD by constructing new RDDs all the time, and
> doing it in a manner that hopes to reuse as much data from the past RDD and
> make the rest garbage collectable. An example merge function would be a
> join on some ids, and creating a merged state for each element. The type of
> the result of m(RDD, RDD_inc) is the same type as that of RDD.
>
>
> My question on this is how does the recomputation work for such an RDD,
> which is not the direct result of hdfs load, but is the result of a long
> lineage of such functions/transformations:
>
>
> Lets say my RDD is now after 2 merge iterations like this:
>
> RDD_new = merge(merge(RDD, RDD_inc1), RDD_inc2)
>
>
> When recomputing a part of RDD_new here are my assumptions:
>
> - only full partitions are recomputed, nothing more granular?
>
> - the corresponding partitions of RDD, RDD_inc1 and RDD_inc2 are recomputed
>
> - the function are applied
>
>
> And this seems more simplistic, since the partitions do not fully align in
> the general case between all these RDDs. The other aspect is the
> potentially redundant load of data which is in fact not required anymore
> (the data ruled out in the merge).
>
>
> A more detailed version of this question is at
> https://www.quora.com/How-does-Spark-RDD-recomputation-avoids-duplicate-loading-or-computation/
>
>
> Thanks,
>
> Nicu
>


Re: flatmap() and spark performance

2015-09-28 Thread Hemant Bhanawat
You can use spark.executor.memory to specify the memory of the executors
which will  hold this intermediate results.

You may want to look at the section "Understanding Memory Management in
Spark" of this link:

https://databricks.com/blog/2015/05/28/tuning-java-garbage-collection-for-spark-applications.html


On Tue, Sep 29, 2015 at 10:51 AM, jeff saremi 
wrote:

> Is there anyway to let spark know ahead of time what size of RDD to expect
> as a result of a flatmap() operation?
> And would that help in terms of performance?
> For instance, if I have an RDD of 1million rows and I know that my
> flatMap() will produce 100million rows, is there a way to indicate that to
> Spark? to say "reserve" space for the resulting RDD?
>
> thanks
> Jeff
>


Re: caching DataFrames

2015-09-23 Thread Hemant Bhanawat
Two dataframes do not share cache storage in Spark. Hence it's immaterial
that how two dataFrames are related to each other. Both of them are going
to consume memory based on the data that they have.  So for your A1 and B1
you would need extra memory that would be equivalent to half the memory of
A/B.

You can check the storage that a dataFrame is consuming in the Spark UI's
Storage tab. http://host:4040/storage/



On Thu, Sep 24, 2015 at 5:37 AM, Zhang, Jingyu 
wrote:

> I have A and B DataFrames
> A has columns a11,a12, a21,a22
> B has columns b11,b12, b21,b22
>
> I persistent them in cache
> 1. A.Cache(),
> 2.  B.Cache()
>
> Then, I persistent the subset in cache later
>
> 3. DataFrame A1 (a11,a12).cache()
>
> 4. DataFrame B1 (b11,b12).cache()
>
> 5. DataFrame AB1 (a11,a12,b11,b12).cahce()
>
> Can you please tell me what happen for caching case (3,4, and 5) after A
> and B cached?
> How much  more memory do I need compare with Caching 1 and 2 only?
>
> Thanks
>
> Jingyu
>
> This message and its attachments may contain legally privileged or
> confidential information. It is intended solely for the named addressee. If
> you are not the addressee indicated in this message or responsible for
> delivery of the message to the addressee, you may not copy or deliver this
> message or its attachments to anyone. Rather, you should permanently delete
> this message and its attachments and kindly notify the sender by reply
> e-mail. Any content of this message and its attachments which does not
> relate to the official business of the sending company must be taken not to
> have been sent or endorsed by that company or any of its related entities.
> No warranty is made that the e-mail or attachments are free from computer
> virus or other defect.


Re: DataGenerator for streaming application

2015-09-21 Thread Hemant Bhanawat
Why are you using  rawSocketStream to read the data? I believe
rawSocketStream waits for a big chunk of data before it can start
processing it. I think what you are writing is a String and you should use
socketTextStream which reads the data on a per line basis.

On Sun, Sep 20, 2015 at 9:56 AM, Saiph Kappa  wrote:

> Hi,
>
> I am trying to build a data generator that feeds a streaming application.
> This data generator just reads a file and send its lines through a socket.
> I get no errors on the logs, and the benchmark bellow always prints
> "Received 0 records". Am I doing something wrong?
>
>
> object MyDataGenerator {
>
>   def main(args: Array[String]) {
> if (args.length != 3) {
>   System.err.println("Usage: RawTextSender   ")
>   System.exit(1)
> }
> // Parse the arguments using a pattern match
> val (port, file, sleepMillis) = (args(0).toInt, args(1), args(2).toInt)
>
> val serverSocket = new ServerSocket(port)
> println("Listening on port " + port)
>
>
> while (true) {
>   val socket = serverSocket.accept()
>   println("Got a new connection")
>
>
>   val out = new PrintWriter(socket.getOutputStream)
>   try {
> var count = 0
> var startTimestamp = -1
> for (line <- Source.fromFile(file).getLines()) {
>   val ts = line.substring(2, line.indexOf(',',2)).toInt
>   if(startTimestamp < 0)
> startTimestamp = ts
>
>   if(ts - startTimestamp <= 30) {
> out.println(line)
> count += 1
>   } else {
> println(s"Emmited reports: $count")
> count = 0
> out.flush()
> startTimestamp = ts
> Thread.sleep(sleepMillis)
>   }
> }
>   } catch {
> case e: IOException =>
>   println("Client disconnected")
>   socket.close()
>   }
> }
> }
> }
>
>
>
> object Benchmark {
>   def main(args: Array[String]) {
> if (args.length != 4) {
>   System.err.println("Usage: RawNetworkGrep
> ")
>   System.exit(1)
> }
>
> val (numStreams, host, port, batchMillis) = (args(0).toInt, args(1), 
> args(2).toInt, args(3).toInt)
> val sparkConf = new SparkConf()
> sparkConf.setAppName("BenchMark")
> 
> sparkConf.setJars(Array("target/scala-2.10/benchmark-app_2.10-0.1-SNAPSHOT.jar"))
> sparkConf.set("spark.serializer", 
> "org.apache.spark.serializer.KryoSerializer")
> sparkConf.set("spark.executor.extraJavaOptions", " -XX:+UseCompressedOops 
> -XX:+UseConcMarkSweepGC -XX:+AggressiveOpts -XX:FreqInlineSize=300 
> -XX:MaxInlineSize=300 ")
> if (sparkConf.getOption("spark.master") == None) {
>   // Master not set, as this was not launched through Spark-submit. 
> Setting master as local."
>   sparkConf.setMaster("local[*]")
> }
>
> // Create the context
> val ssc = new StreamingContext(sparkConf, Duration(batchMillis))
>
> val rawStreams = (1 to numStreams).map(_ =>
>   ssc.rawSocketStream[String](host, port, 
> StorageLevel.MEMORY_ONLY_SER)).toArray
> val union = ssc.union(rawStreams)
> union.count().map(c => s"Received $c records").print()
> ssc.start()
> ssc.awaitTermination()
>   }
> }
>
> Thanks.
>
>


Re: Why are executors on slave never used?

2015-09-21 Thread Hemant Bhanawat
When you specify master as local[2], it starts the spark components in a
single jvm. You need to specify the master correctly.
I have a default AWS EMR cluster (1 master, 1 slave) with Spark. When I run
a Spark process, it works fine -- but only on the master, as if it were
standalone.

The web-UI and logging code shows only 1 executor, the localhost.

How can I diagnose this?

(I create *SparkConf, *in Python, with *setMaster('local[2]'). )*

(Strangely, though I don't think that this causes the problem, there is
almost nothing spark-related on the slave machine:* /usr/lib/spark *has a
few jars, but that's it:  *datanucleus-api-jdo.jar  datanucleus-core.jar
 datanucleus-rdbms.jar  spark-yarn-shuffle.jar. *But this is an AWS EMR
cluster as created by* create-cluster*, so I would assume that the slave
and master are configured OK out-of the box.)

Joshua


Re: A way to timeout and terminate a laggard 'Stage' ?

2015-09-17 Thread Hemant Bhanawat
Driver timing out laggards seems like a reasonable way of handling
laggards. Are there any challenges because of which driver does not do it
today? Is there a JIRA for this? I couldn't find one.





On Tue, Sep 15, 2015 at 12:07 PM, Akhil Das 
wrote:

> As of now i think its a no. Not sure if its a naive approach, but yes you
> can have a separate program to keep an eye in the webui (possibly parsing
> the content) and make it trigger the kill task/job once it detects a lag.
> (Again you will have to figure out the correct numbers before killing any
> job)
>
> Thanks
> Best Regards
>
> On Mon, Sep 14, 2015 at 10:40 PM, Dmitry Goldenberg <
> dgoldenberg...@gmail.com> wrote:
>
>> Is there a way in Spark to automatically terminate laggard "stage's",
>> ones that appear to be hanging?   In other words, is there a timeout for
>> processing of a given RDD?
>>
>> In the Spark GUI, I see the "kill" function for a given Stage under
>> 'Details for Job <...>".
>>
>> Is there something in Spark that would identify and kill laggards
>> proactively?
>>
>> Thanks.
>>
>
>


Re: Difference between sparkDriver and "executor ID driver"

2015-09-16 Thread Hemant Bhanawat
1. When you call new SparkContext(), spark driver is started which
internally create Akka ActorSystem which registers on this port.

2. Since you are running in local mode, starting of executor is short
circuited and an Executor object is created in the same process (see
LocalEndpoint). This Executor object logs this message with executor ID as
"driver".

On Wed, Sep 16, 2015 at 9:44 AM, Muler  wrote:

> I'm running Spark in local mode and getting these two log messages who
> appear to be similar. I want to understand what each is doing:
>
>
>1. [main] util.Utils (Logging.scala:logInfo(59)) - Successfully
>started service 'sparkDriver' on port 60782.
>2. [main] executor.Executor (Logging.scala:logInfo(59)) - Starting
>executor ID driver on host localhost
>
> 1. is created using:
>
> val actorSystemName = if (isDriver) driverActorSystemName else
> executorActorSystemName
>
> val rpcEnv = RpcEnv.create(actorSystemName, hostname, port, conf,
> securityManager)
> val actorSystem = rpcEnv.asInstanceOf[AkkaRpcEnv].actorSystem
>
> 2. is created when:
>
>  _taskScheduler.start()
>
>
> What is the difference and what does each do?
>
>
>


Re: taking an n number of rows from and RDD starting from an index

2015-09-02 Thread Hemant Bhanawat
I think rdd.toLocalIterator is what you want. But it will keep one
partition's data in-memory.

On Wed, Sep 2, 2015 at 10:05 AM, Niranda Perera 
wrote:

> Hi all,
>
> I have a large set of data which would not fit into the memory. So, I wan
> to take n number of data from the RDD given a particular index. for an
> example, take 1000 rows starting from the index 1001.
>
> I see that there is a  take(num: Int): Array[T] method in the RDD, but it
> only returns the 'first n number of rows'.
>
> the simplest use case of this, requirement is, say, I write a custom
> relation provider with a custom relation extending the InsertableRelation.
>
> say I submit this query,
> "insert into table abc select * from xyz sort by x asc"
>
> in my custom relation, I have implemented the def insert(data: DataFrame,
> overwrite: Boolean): Unit
> method. here, since the data is large, I can not call methods such as
> DataFrame.collect(). Instead, I could do, DataFrame.foreachpartition(...).
> As you could see, the resultant DF from the "select * from xyz sort by x
> asc" is sorted, and if I sun, foreachpartition on that DF and implement the
> insert method, this sorted order would be affected, since the inserting
> operation would be done in parallel in each partition.
>
> in order to handle this, my initial idea was to take rows from the RDD in
> batches and do the insert operation, and for that I was looking for a
> method to take n number of rows starting from a given index.
>
> is there any better way to handle this, in RDDs?
>
> your assistance in this regard is highly appreciated.
>
> cheers
>
> --
> Niranda
> @n1r44 
> https://pythagoreanscript.wordpress.com/
>


Re: How to Serialize and Reconstruct JavaRDD later?

2015-09-02 Thread Hemant Bhanawat
You want to persist the state between the execution of two rdds. So, I
believe what you need is serialization of your model and not JavaRDD. If
you can serialize your model, you can persist that in HDFS or some other
datastore to be used by the next RDDs.

If you are using Spark Streaming, doing this would be easy.

On Wed, Sep 2, 2015 at 4:54 PM, Raja Reddy  wrote:

> Hi All,
>
> *Context:*
> I am exploring topic modelling with LDA with Spark MLLib. However, I need
> my model to enhance as more batches of documents come in.
>
> As of now I see no way of doing something like this, which gensim
>  does:
>
> lda.update(other_corpus)
>
> The only way I can enhance my model is essentially to recompute the
> LDAModel over all the documents accumulated after a new batch arrives.
>
> *Question:*
> One of the time consuming steps before performing topic modelling would be
> to construct the corpus as JavaRDD object, while reading through the actual
> documents.
>
> Capability to serialize a JavaRDD instance and reconstructing JavaRDD from
> the serialized snapshot would be helpful in this case. Suppose say I
> construct and serialize JavaRDD after reading Batch-1 of documents. When
> the Batch-2 arrives, I would like to deserialize the previously serialized
> RDD and mutate it with contents of new batch of documents. Could someone
> please let me know if serialization and deserialization of a JavaRDD
> instance is possible? I will have more questions if serialization is
> possible, mostly to do with changing spark configuration in between a
> serialization operation and deserialization operation.
>
> Thanks and Regards,
> Raja.
>


Re: Performance issue with Spark join

2015-08-26 Thread Hemant Bhanawat
Spark joins are different than traditional database joins because of the
lack of support of indexes.  Spark has to shuffle data between various
nodes to perform joins. Hence joins are bound to be much slower than count
which is just a parallel scan of the data.

Still, to ensure that nothing is wrong with the setup, you may want to look
at your Spark Task UI. You may want to look at the Shuffle Reads and
Shuffle write parameters.

On Wed, Aug 26, 2015 at 3:08 PM, lucap luca-pi...@hotmail.it wrote:

 Hi,

 I'm trying to perform an ETL using Spark, but as soon as I start performing
 joins performance degrades a lot. Let me explain what I'm doing and what I
 found out until now.

 First of all, I'm reading avro files that are on a Cloudera cluster, using
 commands like this:
 /val tab1 = sc.hadoopFile(hdfs:///path/to/file,
 classOf[org.apache.avro.mapred.AvroInputFormat[GenericRecord]],
 classOf[org.apache.avro.mapred.AvroWrapper[GenericRecord]],
 classOf[org.apache.hadoop.io.NullWritable], 10)/

 After this, I'm applying some filter functions to data (to reproduce
 where
 clauses of the original query) and then I'm using one map for each table in
 order to translate RDD elements in (key,record) format. Let's say I'm doing
 this:
 /val elabTab1 = tab1.filter(...).map()/

 It is important to notice that if I do something like /elabTab1.first/ or
 /elabTab1.count/ the task is performed in a short time, let's say around
 impala's time. Now I need to do the following:
 /val joined = elabTab1.leftOuterJoin(elabTab2)/
 Then I tried something like /joined.count/ to test performance, but it
 degraded really a lot (let's say that a count on a single table takes like
 4
 seconds and the count on the joined table takes 12 minutes). I think
 there's
 a problem with the configuration, but what might it be?

 I'll give you some more information:
 1] Spark is running on YARN on a Cloudera cluster
 2] I'm starting spark-shell with a command like /spark-shell
 --executor-cores 5 --executor-memory 10G/ that gives the shell approx 10
 vcores and 25 GB of memory
 3] The task seems still for a lot of time after the map tasks, with the
 following message in console: /Asked to send map output locations for
 shuffle ... to .../
 4] If I open the stderr of the executors, I can read plenty of messages
 like
 the following: /Thread ... spilling in-memory map of ... MB to disk/, where
 MBs are in the order of 300-400
 5] I tried to raise the number of executors, but the situation didn't seem
 to change much. I also tried to change the number of splits of the avro
 files (currently set to 10), but it didn't seem to change much as well
 6] Tables aren't particularly big, the bigger one should be few GBs

 Regards,
 Luca



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Performance-issue-with-Spark-join-tp24458.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Exception throws when running spark pi in Intellij Idea that scala.collection.Seq is not found

2015-08-25 Thread Hemant Bhanawat
Go to the module settings of the project and in the dependencies section
check the scope of scala jars. It would be either Test or Provided. Change
it to compile and it should work. Check the following link to understand
more about scope of modules:

https://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html



On Tue, Aug 25, 2015 at 12:18 PM, Todd bit1...@163.com wrote:

 I cloned the code from https://github.com/apache/spark to my machine. It
 can compile successfully,
 But when I run the sparkpi, it throws an exception below complaining the
 scala.collection.Seq is not found.
 I have installed scala2.10.4 in my machine, and use the default profiles:
 window,scala2.10,maven-3,test-java-home.
 In Idea, I can find that the Seq class is on my classpath:





 Exception in thread main java.lang.NoClassDefFoundError:
 scala/collection/Seq
 at org.apache.spark.examples.SparkPi.main(SparkPi.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
 Caused by: java.lang.ClassNotFoundException: scala.collection.Seq
 at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
 ... 6 more




Re: How to set environment of worker applications

2015-08-25 Thread Hemant Bhanawat
Ok, I went in the direction of system vars since beginning probably because
the question was to pass variables to a particular job.

Anyway, the decision to use either system vars or environment vars would
solely depend on whether you want to make them available to all the spark
processes on a node or to a particular job.

Are there any other reasons why one would prefer one over the other?


On Mon, Aug 24, 2015 at 8:48 PM, Raghavendra Pandey 
raghavendra.pan...@gmail.com wrote:

 System properties and environment variables are two different things.. One
 can use spark.executor.extraJavaOptions to pass system properties and
 spark-env.sh to pass environment variables.

 -raghav

 On Mon, Aug 24, 2015 at 1:00 PM, Hemant Bhanawat hemant9...@gmail.com
 wrote:

 That's surprising. Passing the environment variables using
 spark.executor.extraJavaOptions=-Dmyenvvar=xxx to the executor and then
 fetching them using System.getProperty(myenvvar) has worked for me.

 What is the error that you guys got?

 On Mon, Aug 24, 2015 at 12:10 AM, Sathish Kumaran Vairavelu 
 vsathishkuma...@gmail.com wrote:

 spark-env.sh works for me in Spark 1.4 but not
 spark.executor.extraJavaOptions.

 On Sun, Aug 23, 2015 at 11:27 AM Raghavendra Pandey 
 raghavendra.pan...@gmail.com wrote:

 I think the only way to pass on environment variables to worker node is
 to write it in spark-env.sh file on each worker node.

 On Sun, Aug 23, 2015 at 8:16 PM, Hemant Bhanawat hemant9...@gmail.com
 wrote:

 Check for spark.driver.extraJavaOptions and
 spark.executor.extraJavaOptions in the following article. I think you can
 use -D to pass system vars:

 spark.apache.org/docs/latest/configuration.html#runtime-environment
 Hi,

 I am starting a spark streaming job in standalone mode with
 spark-submit.

 Is there a way to make the UNIX environment variables with which
 spark-submit is started available to the processes started on the worker
 nodes?

 Jan
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org







Re: Joining using mulitimap or array

2015-08-24 Thread Hemant Bhanawat
In your example, a.attributes.name is a list and is not a string . Run this
to find it out :

a.select($a.attributes.name).show()


On Mon, Aug 24, 2015 at 2:51 PM, Ilya Karpov i.kar...@cleverdata.ru wrote:

 Hi, guys
 I'm confused about joining columns in SparkSQL and need your advice.
 I want to join 2 datasets of profiles. Each profile has name and array of
 attributes(age, gender, email etc).
 There can be mutliple instances of attribute with the same name, e.g.
 profile has 2 emails - so 2 attributes with name = 'email' in
 array. Now I want to join 2 datasets using 'email' attribute. I cant find
 the way to do it :(

 The code is below. Now result of join is empty, while I expect to see 1
 row with all Alice emails.

 import org.apache.spark.sql.{DataFrame, SQLContext}
 import org.apache.spark.{SparkConf, SparkContext}

 case class Attribute(name: String, value: String, weight: Float)
 case class Profile(name: String, attributes: Seq[Attribute])

 object SparkJoinArrayColumn {
   def main(args: Array[String]) {
 val sc: SparkContext = new SparkContext(new
 SparkConf().setMaster(local).setAppName(getClass.getSimpleName))
 val sqlContext: SQLContext = new SQLContext(sc)

 import sqlContext.implicits._

 val a: DataFrame = sc.parallelize(Seq(
   Profile(Alice, Seq(Attribute(email, al...@mail.com, 1.0f),
 Attribute(email, a.jo...@mail.com, 1.0f)))
 )).toDF.as(a)

 val b: DataFrame = sc.parallelize(Seq(
   Profile(Alice, Seq(Attribute(email, al...@mail.com, 1.0f),
 Attribute(age, 29, 0.2f)))
 )).toDF.as(b)


 a.where($a.attributes.name === email)
   .join(
 b.where($b.attributes.name === email),
 $a.attributes.value === $b.attributes.value
   )
 .show()
   }
 }

 Thanks forward!
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: How to set environment of worker applications

2015-08-24 Thread Hemant Bhanawat
That's surprising. Passing the environment variables using
spark.executor.extraJavaOptions=-Dmyenvvar=xxx to the executor and then
fetching them using System.getProperty(myenvvar) has worked for me.

What is the error that you guys got?

On Mon, Aug 24, 2015 at 12:10 AM, Sathish Kumaran Vairavelu 
vsathishkuma...@gmail.com wrote:

 spark-env.sh works for me in Spark 1.4 but not
 spark.executor.extraJavaOptions.

 On Sun, Aug 23, 2015 at 11:27 AM Raghavendra Pandey 
 raghavendra.pan...@gmail.com wrote:

 I think the only way to pass on environment variables to worker node is
 to write it in spark-env.sh file on each worker node.

 On Sun, Aug 23, 2015 at 8:16 PM, Hemant Bhanawat hemant9...@gmail.com
 wrote:

 Check for spark.driver.extraJavaOptions and
 spark.executor.extraJavaOptions in the following article. I think you can
 use -D to pass system vars:

 spark.apache.org/docs/latest/configuration.html#runtime-environment
 Hi,

 I am starting a spark streaming job in standalone mode with spark-submit.

 Is there a way to make the UNIX environment variables with which
 spark-submit is started available to the processes started on the worker
 nodes?

 Jan
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: How to set environment of worker applications

2015-08-23 Thread Hemant Bhanawat
Check for spark.driver.extraJavaOptions and spark.executor.extraJavaOptions
in the following article. I think you can use -D to pass system vars:

spark.apache.org/docs/latest/configuration.html#runtime-environment
Hi,

I am starting a spark streaming job in standalone mode with spark-submit.

Is there a way to make the UNIX environment variables with which
spark-submit is started available to the processes started on the worker
nodes?

Jan
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


Re: PySpark concurrent jobs using single SparkContext

2015-08-21 Thread Hemant Bhanawat
It seems like you want simultaneous processing of multiple jobs but at the
same time serialization of few tasks within those jobs. I don't know how to
achieve that in Spark.

But, why would you bother about the inter-weaved processing when the data
that is being aggregated in different jobs is per customer per day? Is it
that save_aggregate depends on results of other customers and/or other
days?

I also don't understand how you would achieve that with yarn because
interweaving of tasks of separately submitted jobs may happen with dynamic
executor allocation as well.

Hemant


On Thu, Aug 20, 2015 at 7:04 PM, Mike Sukmanowsky 
mike.sukmanow...@gmail.com wrote:

 Hi all,

 We're using Spark 1.3.0 via a small YARN cluster to do some log
 processing. The jobs are pretty simple, for a number of customers and a
 number of days, fetch some event log data, build aggregates and store those
 aggregates into a data store.

 The way our script is written right now does something akin to:

 with SparkContext() as sc:
 for customer in customers:
 for day in days:
 logs = sc.textFile(get_logs(customer, day))
 aggregate = make_aggregate(logs)
 # This function contains the action saveAsNewAPIHadoopFile which
 # triggers a save
 save_aggregate(aggregate)

 ​
 So we have a Spark job per customer, per day.

 I tried doing some parallel job submission with something similar to:

 def make_and_save_aggregate(customer, day, spark_context):
 # Without a separate threading.Lock() here or better yet, one guarding the
 # Spark context, multiple customer/day transformations and actions could
 # be interweaved
 sc = spark_context
 logs = sc.textFile(get_logs(customer, day))
 aggregate = make_aggregate(logs)
 save_aggregate(aggregate)
 with SparkContext() as sc, futures.ThreadPoolExecutor(4) as executor:
 for customer in customers:
 for day in days:
 executor.submit(make_and_save_aggregate, customer, day, sc)

 ​
 The problem is, with no locks on a SparkContext except during
 initialization
 https://github.com/apache/spark/blob/v1.3.0/python/pyspark/context.py#L214-L241
  and
 shutdown
 https://github.com/apache/spark/blob/v1.3.0/python/pyspark/context.py#L296-L307,
 operations on the context could (if I understand correctly) be interweaved
 leading to DAG which contains transformations out of order and from
 different customer, day periods.

 One solution is instead to launch multiple Spark jobs via spark-submit and
 let YARN/Spark's dynamic executor allocation take care of fair scheduling.
 In practice, this doesn't seem to yield very fast computation perhaps due
 to some additional overhead with YARN.

 Is there any safe way to launch concurrent jobs like this using a single
 PySpark context?

 --
 Mike Sukmanowsky
 Aspiring Digital Carpenter

 *e*: mike.sukmanow...@gmail.com

 LinkedIn http://www.linkedin.com/profile/view?id=10897143 | github
 https://github.com/msukmanowsky




Re: persist for DStream

2015-08-20 Thread Hemant Bhanawat
Are you asking for something more than this?

http://spark.apache.org/docs/latest/streaming-programming-guide.html#caching--persistence



On Thu, Aug 20, 2015 at 2:09 PM, Deepesh Maheshwari 
deepesh.maheshwar...@gmail.com wrote:

 Hi,

 there are function available tp cache() or persist() RDD in memory but i
 am reading data from kafka in form of DStream and applying operation it and
 i want to persist that DStream in memory for further.

 Please suggest method how i can persist DStream in memory.

 Regards,
 Deepesh



Re: spark.sql.shuffle.partitions=1 seems to be working fine but creates timeout for large skewed data

2015-08-20 Thread Hemant Bhanawat
Sorry, I misread your mail. Thanks for pointing that out.

BTW, are the 8 files shuffle intermediate output and not the final
output? I assume yes. I didn't know that you can keep intermediate output
on HDFS and I don't think that is recommended.




On Thu, Aug 20, 2015 at 2:43 PM, Hemant Bhanawat hemant9...@gmail.com
wrote:

 Looks like you are using hash based shuffling and not sort based shuffling
 which creates a single file per maptask.

 On Thu, Aug 20, 2015 at 12:43 AM, unk1102 umesh.ka...@gmail.com wrote:

 Hi I have a Spark job which deals with large skewed dataset. I have around
 1000 Hive partitions to process in four different tables every day. So if
 I
 go with 200 spark.sql.shuffle.partitions default partitions created by
 Spark
 I end up with 4 * 1000 * 200 = 8 small small files in HDFS which wont
 be
 good for HDFS name node I have been told if you keep on creating such
 large
 no of small small files namenode will crash is it true? please help me
 understand. Anyways so to avoid creating small files I did set
 spark.sql.shuffle.partitions=1 it seems to be creating 1 output file and
 as
 per my understanding because of only one output there is so much shuffling
 to do to bring all data to once reducer please correct me if I am wrong.
 This is causing memory/timeout issues how do I deal with it

 I tried to give spark.shuffle.storage=0.7 also still this memory seems not
 enough for it. I have 25 gb executor with 4 cores and 20 such executors
 still Spark job fails please guide.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-shuffle-partitions-1-seems-to-be-working-fine-but-creates-timeout-for-large-skewed-data-tp24346.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Regarding rdd.collect()

2015-08-18 Thread Hemant Bhanawat
On Tue, Aug 18, 2015 at 1:16 PM, Dawid Wysakowicz 
wysakowicz.da...@gmail.com wrote:

 No, the data is not stored between two jobs. But it is stored for a
 lifetime of a job. Job can have multiple actions run.

I too thought so but wanted to confirm. Thanks.


 For a matter of sharing an rdd between jobs you can have a look at Spark
 Job Server(spark-jobserver https://github.com/ooyala/spark-jobserver)
 or some In-Memory storages: Tachyon(http://tachyon-project.org/) or
 Ignite(https://ignite.incubator.apache.org/)

 2015-08-18 9:37 GMT+02:00 Hemant Bhanawat hemant9...@gmail.com:

 It is still in memory for future rdd transformations and actions.

 This is interesting. You mean Spark holds the data in memory between two
 job executions.  How does the second job get the handle of the data in
 memory? I am interested in knowing more about it. Can you forward me a
 spark article or JIRA that talks about it?

 On Tue, Aug 18, 2015 at 12:49 PM, Sabarish Sasidharan 
 sabarish.sasidha...@manthan.com wrote:

 It is still in memory for future rdd transformations and actions. What
 you get in driver is a copy of the data.

 Regards
 Sab

 On Tue, Aug 18, 2015 at 12:02 PM, praveen S mylogi...@gmail.com wrote:

 When I do an rdd.collect().. The data moves back to driver  Or is still
 held in memory across the executors?




 --

 Architect - Big Data
 Ph: +91 99805 99458

 Manthan Systems | *Company of the year - Analytics (2014 Frost and
 Sullivan India ICT)*
 +++






Re: global variable in spark streaming with no dependency on key

2015-08-18 Thread Hemant Bhanawat
See if SparkContext.accumulator helps.

On Tue, Aug 18, 2015 at 2:27 PM, Joanne Contact joannenetw...@gmail.com
wrote:

 Hi Gurus,

 Please help.

 But please don't tell me to use updateStateByKey because I need a
 global variable (something like the clock time) across the micro
 batches but not depending on key. For my case, it is not acceptable to
 maintain a state for each key since each key comes in different times.
 Yes my global variable is related to time but cannot use machine
 clock.

 Any hint? Or is this lack of global variable by design?

 Thanks!

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Regarding rdd.collect()

2015-08-18 Thread Hemant Bhanawat
It is still in memory for future rdd transformations and actions.

This is interesting. You mean Spark holds the data in memory between two
job executions.  How does the second job get the handle of the data in
memory? I am interested in knowing more about it. Can you forward me a
spark article or JIRA that talks about it?

On Tue, Aug 18, 2015 at 12:49 PM, Sabarish Sasidharan 
sabarish.sasidha...@manthan.com wrote:

 It is still in memory for future rdd transformations and actions. What you
 get in driver is a copy of the data.

 Regards
 Sab

 On Tue, Aug 18, 2015 at 12:02 PM, praveen S mylogi...@gmail.com wrote:

 When I do an rdd.collect().. The data moves back to driver  Or is still
 held in memory across the executors?




 --

 Architect - Big Data
 Ph: +91 99805 99458

 Manthan Systems | *Company of the year - Analytics (2014 Frost and
 Sullivan India ICT)*
 +++



Re: registering an empty RDD as a temp table in a PySpark SQL context

2015-08-18 Thread Hemant Bhanawat
It is definitely not the case for Spark SQL. A temporary table (much like
dataFrame) is a just a logical plan with a name and it is not iterated
unless a query is fired on it.

I am not sure if using rdd.take in py code to verify the schema is a right
approach as it creates a spark job.

BTW, why would you want to update the Spark code? rdd.take in py code is
the problem. All you want is to avoid the schema verification in the
createDataFrame. I do not see any issue in the spark side in the way it
handles a RDD that has no data.


On Tue, Aug 18, 2015 at 1:23 AM, Eric Walker e...@node.io wrote:

 I have an RDD queried from a scan of a data source.  Sometimes the RDD has
 rows and at other times it has none.  I would like to register this RDD as
 a temporary table in a SQL context.  I suspect this will work in Scala, but
 in PySpark some code assumes that the RDD has rows in it, which are used to
 verify the schema:


 https://github.com/apache/spark/blob/branch-1.3/python/pyspark/sql/context.py#L299

 Before I attempt to extend the Scala code to handle an empty RDD or
 provide an empty DataFrame that can be registered, I was wondering what
 people recommend in this case.  Perhaps there's a simple way of registering
 an empty RDD as a temporary table in a PySpark SQL context that I'm
 overlooking.

 An alternative is to add special case logic in the client code to deal
 with an RDD backed by an empty table scan.  But since the SQL will already
 handle that, I was hoping to avoid special case logic.

 Eric




Re: Apache Spark - Parallel Processing of messages from Kafka - Java

2015-08-16 Thread Hemant Bhanawat
In spark, every action (foreach, collect etc.) gets converted into a spark
job and jobs are executed sequentially.

You may want to refactor your code in calculateUseCase? to just run
transformations (map, flatmap) and call a single action in the end.

On Sun, Aug 16, 2015 at 3:19 PM, mohanaugust mohanaug...@gmail.com wrote:

 JavaPairReceiverInputDStreamString, byte[] messages =
 KafkaUtils.createStream(...);
 JavaPairDStreamString, byte[] filteredMessages =
 filterValidMessages(messages);

 JavaDStreamString useCase1 = calculateUseCase1(filteredMessages);
 JavaDStreamString useCase2 = calculateUseCase2(filteredMessages);
 JavaDStreamString useCase3 = calculateUseCase3(filteredMessages);
 JavaDStreamString useCase4 = calculateUseCase4(filteredMessages);
 ...

 I retrieve messages from Kafka, filter that and use the same messages for
 mutiple use-cases. Here useCase1 to 4 are independent of each other and can
 be calculated parallely. However, when i look at the logs, i see that
 calculations are happening sequentially. How can i make them to run
 parallely. Any suggestion would be helpful



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-Parallel-Processing-of-messages-from-Kafka-Java-tp24284.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Streaming on Exponential Data

2015-08-14 Thread Hemant Bhanawat
What does exponential data means? Does this mean that the amount of the
data that is being received from the stream in a batchinterval is
increasing exponentially as the time progresses?

Does your process have enough memory to handle the data for a batch
interval?

You may want to share Spark task UI snapshots and logs.



On Thu, Aug 13, 2015 at 9:25 PM, UMESH CHAUDHARY umesh9...@gmail.com
wrote:

 Hi,
 I was working with non-reliable receiver version of Spark-Kafka streaming
 i.e.
 KafkaUtils,createStream... where for testing purpose I was getting data at
 constant rate from kafka and it was acting as expected.
 But when there was exponential data in Kafka, my program started crashing
 saying
 Cannot Compute split on input data... also I found on console logs that
 it was adding data continuously in memory while receiving from Kafka.

 How Spark Streaming behaves towards exponential data.



Re: What is the Effect of Serialization within Stages?

2015-08-13 Thread Hemant Bhanawat
A chain of map and flatmap does not cause any
serialization-deserialization.



On Wed, Aug 12, 2015 at 4:02 PM, Mark Heimann mark.heim...@kard.info
wrote:

 Hello everyone,

 I am wondering what the effect of serialization is within a stage.

 My understanding of Spark as an execution engine is that the data flow
 graph is divided into stages and a new stage always starts after an
 operation/transformation that cannot be pipelined (such as groupBy or join)
 because it can only be completed after the whole data set has been taken
 care off. At the end of a stage shuffle files are written and at the
 beginning of the next stage they are read from.

 Within a stage my understanding is that pipelining is used, therefore I
 wonder whether there is any serialization overhead involved when there is
 no shuffling taking place. I am also assuming that my data set fits into
 memory and must not be spilled to disk.

 So if I would chain multiple *map* or *flatMap* operations and they end
 up in the same stage, will there be any serialization overhead for piping
 the result of the first *map* operation as a parameter into the following
 *map* operation?

 Any ideas and feedback appreciated, thanks a lot.

 Best regards,
 Mark



Re: grouping by a partitioned key

2015-08-12 Thread Hemant Bhanawat
Inline..

On Thu, Aug 13, 2015 at 5:06 AM, Eugene Morozov fathers...@list.ru wrote:

 Hemant, William, pls see inlined.

 On 12 Aug 2015, at 18:18, Philip Weaver philip.wea...@gmail.com wrote:

 Yes, I am partitoning using DataFrameWriter.partitionBy, which produces
 the keyed directory structure that you referenced in that link.


 Have you tried to use DataFrame API instead of SQL? I mean smth like
 dataFrame.select(key).agg(count).distinct().agg(sum).
 Could you print explain for this way and for SQL you tried? I’m just
 curious of the difference.


 On Tue, Aug 11, 2015 at 11:54 PM, Hemant Bhanawat hemant9...@gmail.com
 wrote:

 As far as I know, Spark SQL cannot process data on a per-partition-basis.
 DataFrame.foreachPartition is the way.


 What do you mean by “cannot process on per-partition-basis”? DataFrame is
 an RDD on steroids.


I meant that Spark SQL cannot process data of a single partition like you
can do with foreachpartition.



 I haven't tried it, but, following looks like a not-so-sophisticated way
 of making spark sql partition aware.


 http://spark.apache.org/docs/latest/sql-programming-guide.html#partition-discovery


 On Wed, Aug 12, 2015 at 5:00 AM, Philip Weaver philip.wea...@gmail.com
 wrote:

 Thanks.

 In my particular case, I am calculating a distinct count on a key that
 is unique to each partition, so I want to calculate the distinct count
 within each partition, and then sum those. This approach will avoid moving
 the sets of that key around between nodes, which would be very expensive.

 Currently, to accomplish this we are manually reading in the parquet
 files (not through Spark SQL), using a bitset to calculate the unique count
 within each partition, and accumulating that sum. Doing this through Spark
 SQL would be nice, but the naive SELECT distinct(count(...)) approach
 takes 60 times as long :). The approach I mentioned above might be an
 acceptable hybrid solution.

 - Philip


 On Tue, Aug 11, 2015 at 3:27 PM, Eugene Morozov fathers...@list.ru
 wrote:

 Philip,

 If all data per key are inside just one partition, then Spark will
 figure that out. Can you guarantee that’s the case?
 What is it you try to achieve? There might be another way for it, when
 you might be 100% sure what’s happening.

 You can print debugString or explain (for DataFrame) to see what’s
 happening under the hood.


 On 12 Aug 2015, at 01:19, Philip Weaver philip.wea...@gmail.com
 wrote:

 If I have an RDD that happens to already be partitioned by a key, how
 efficient can I expect a groupBy operation to be? I would expect that Spark
 shouldn't have to move data around between nodes, and simply will have a
 small amount of work just checking the partitions to discover that it
 doesn't need to move anything around.

 Now, what if we're talking about a parquet database created by using
 DataFrameWriter.partitionBy(...), then will Spark SQL be smart when I group
 by a key that I'm already partitioned by?

 - Philip


 Eugene Morozov
 fathers...@list.ru








 Eugene Morozov
 fathers...@list.ru







Re: grouping by a partitioned key

2015-08-12 Thread Hemant Bhanawat
As far as I know, Spark SQL cannot process data on a per-partition-basis.
DataFrame.foreachPartition is the way.

I haven't tried it, but, following looks like a not-so-sophisticated way of
making spark sql partition aware.

http://spark.apache.org/docs/latest/sql-programming-guide.html#partition-discovery


On Wed, Aug 12, 2015 at 5:00 AM, Philip Weaver philip.wea...@gmail.com
wrote:

 Thanks.

 In my particular case, I am calculating a distinct count on a key that is
 unique to each partition, so I want to calculate the distinct count within
 each partition, and then sum those. This approach will avoid moving the
 sets of that key around between nodes, which would be very expensive.

 Currently, to accomplish this we are manually reading in the parquet files
 (not through Spark SQL), using a bitset to calculate the unique count
 within each partition, and accumulating that sum. Doing this through Spark
 SQL would be nice, but the naive SELECT distinct(count(...)) approach
 takes 60 times as long :). The approach I mentioned above might be an
 acceptable hybrid solution.

 - Philip


 On Tue, Aug 11, 2015 at 3:27 PM, Eugene Morozov fathers...@list.ru
 wrote:

 Philip,

 If all data per key are inside just one partition, then Spark will figure
 that out. Can you guarantee that’s the case?
 What is it you try to achieve? There might be another way for it, when
 you might be 100% sure what’s happening.

 You can print debugString or explain (for DataFrame) to see what’s
 happening under the hood.


 On 12 Aug 2015, at 01:19, Philip Weaver philip.wea...@gmail.com wrote:

 If I have an RDD that happens to already be partitioned by a key, how
 efficient can I expect a groupBy operation to be? I would expect that Spark
 shouldn't have to move data around between nodes, and simply will have a
 small amount of work just checking the partitions to discover that it
 doesn't need to move anything around.

 Now, what if we're talking about a parquet database created by using
 DataFrameWriter.partitionBy(...), then will Spark SQL be smart when I group
 by a key that I'm already partitioned by?

 - Philip


 Eugene Morozov
 fathers...@list.ru








Re: How to minimize shuffling on Spark dataframe Join?

2015-08-11 Thread Hemant Bhanawat
Is the source of your dataframe partitioned on key? As per your mail, it
looks like it is not. If that is the case,  for partitioning the data, you
will have to shuffle the data anyway.

Another part of your question is - how to co-group data from two dataframes
based on a key? I think for RDD's cogroup in PairRDDFunctions is a way. I
am not sure if something similar is available for DataFrames.

Hemant





On Tue, Aug 11, 2015 at 2:14 PM, Abdullah Anwar 
abdullah.ibn.an...@gmail.com wrote:



 I have two dataframes like this

   student_rdf = (studentid, name, ...)
   student_result_rdf = (studentid, gpa, ...)

 we need to join this two dataframes. we are now doing like this,

 student_rdf.join(student_result_rdf, student_result_rdf[studentid] == 
 student_rdf[studentid])

 So it is simple. But it creates lots of data shuffling across worker
 nodes, but as joining key is similar and if the dataframe could (understand
 the partitionkey) be partitioned using that key (studentid) then there
 suppose not to be any shuffling at all. As similar data (based on partition
 key) would reside in similar node. is it possible, to hint spark to do this?

 So, I am finding the way to partition data based on a column while I read
 a dataframe from input. And If it is possible that Spark would understand
 that two partitionkey of two dataframes are similar, then how?




 --
 Abdullah



Re: Partitioning in spark streaming

2015-08-11 Thread Hemant Bhanawat
Posting a comment from my previous mail post:

When data is received from a stream source, receiver creates blocks of
data.  A new block of data is generated every blockInterval milliseconds. N
blocks of data are created during the batchInterval where N =
batchInterval/blockInterval. A RDD is created on the driver for the blocks
created during the batchInterval. The blocks generated during the
batchInterval are partitions of the RDD.

Now if you want to repartition based on a key, a shuffle is needed.

On Wed, Aug 12, 2015 at 4:36 AM, Mohit Anchlia mohitanch...@gmail.com
wrote:

 How does partitioning in spark work when it comes to streaming? What's the
 best way to partition a time series data grouped by a certain tag like
 categories of product video, music etc.



Re: [VOTE] Grandfathering forgotten Geode contributors

2015-06-11 Thread Hemant Bhanawat
+1

On Thu, Jun 11, 2015 at 3:37 AM, William A Rowe Jr wr...@rowe-clan.net
wrote:

 On Wed, Jun 10, 2015 at 5:03 PM, William A Rowe Jr wr...@rowe-clan.net
 wrote:

 
  If you hold a public vote to make them committers, they are not on the
  PPMC.
  If you hold a private vote, likewise.  If you hold a vote to make them
  committers
  as well as PPMC members, and send the new list of PPMC members to the
  IPMC as lazy concensus of the roster change, then they become both.  I'd
  like
  to see that happen.  These words matter in voting, and we might as well
  get
  them right every time a new committer and/or [P]PMC member is suggested.
 

 [I realize this contradicts my early comments about treating people-votes,
 any
 committee-change vote with active consensus and unanimity.  The IPMC or the
 Board (for incubating and top-level projects, respectively) do not pretend
 to know
 all of the committers to our project, unlike the project's committee
 members,
 and those names are brought up for passive approval entirely only for
 reporting
 and a bit extra scrutiny.  They realistically won't be contradicted unless
 someone
 has some seriously negative karma that the IPMC or Board are aware of, but
 the
 IPMC and Board aren't expected to '+1' each person they don't know of.]



Re: [VOTE] Grandfathering forgotten Geode contributors

2015-06-11 Thread Hemant Bhanawat
+1

On Fri, Jun 12, 2015 at 11:15 AM, Rajesh Kumar rku...@pivotal.io wrote:

 +1

 On Fri, Jun 12, 2015 at 10:43 AM, Suranjan Kumar suranjan.ku...@gmail.com
 
 wrote:

  +1
 
  On Fri, Jun 12, 2015 at 10:42 AM, Shirish Deshmukh sdeshm...@pivotal.io
 
  wrote:
 
   +1
  
   On Fri, Jun 12, 2015 at 4:45 AM, Roman Shaposhnik r...@apache.org
  wrote:
  
Hi!
   
for various reasons we missed a few Pivotal folks
when submitting a Geode proposal. There's nothing
controversial about them -- just an honest mistake.
They are all currently working on the project and
contributed quite a bit in the past. In short, they have
as much a claim in being a project committer as
all the other folks who were on the proposal.
   
I propose that we add them to the project as committers
since they should've been on the proposal to begin with:
 * Chloe Jackson
 * Manuel David
 * Rajesh Kumar
 * Rishitesh Mishra
 * Shankar Hundekar
 * Michael Stolz
 * Mak Gokhale
   
This majority vote is open for at least 72 hours.
   
Here's my +1.
   
Thanks,
Roman.
   
  
 



Re: Spark Streaming - Design considerations/Knobs

2015-05-21 Thread Hemant Bhanawat
Honestly, given the length of my email, I didn't expect a reply. :-) Thanks
for reading and replying. However, I have a follow-up question:

I don't think if I understand the block replication completely. Are the
blocks replicated immediately after they are received by the receiver? Or
are they kept on the receiver node only and are moved only on shuffle? Has
the replication something to do with locality.wait?

Thanks,
Hemant

On Thu, May 21, 2015 at 2:21 AM, Tathagata Das t...@databricks.com wrote:

 Correcting the ones that are incorrect or incomplete. BUT this is good
 list for things to remember about Spark Streaming.


 On Wed, May 20, 2015 at 3:40 AM, Hemant Bhanawat hemant9...@gmail.com
 wrote:

 Hi,

 I have compiled a list (from online sources) of knobs/design
 considerations that need to be taken care of by applications running on
 spark streaming. Is my understanding correct?  Any other important design
 consideration that I should take care of?


- A DStream is associated with a single receiver. For attaining read
parallelism multiple receivers i.e. multiple DStreams need to be created.
- A receiver is run within an executor. It occupies one core. Ensure
that there are enough cores for processing after receiver slots are booked
i.e. spark.cores.max should take the receiver slots into account.
- The receivers are allocated to executors in a round robin fashion.
- When data is received from a stream source, receiver creates blocks
of data.  A new block of data is generated every blockInterval
milliseconds. N blocks of data are created during the batchInterval where 
 N
= batchInterval/blockInterval.
- These blocks are distributed by the BlockManager of the current
executor to the block managers of other executors. After that, the Network
Input Tracker running on the driver is informed about the block locations
for further processing.
- A RDD is created on the driver for the blocks created during the
batchInterval. The blocks generated during the batchInterval are 
 partitions
of the RDD. Each partition is a task in spark. blockInterval==
batchinterval would mean that a single partition is created and probably 
 it
is processed locally.

 The map tasks on the blocks are processed in the executors (one that
 received the block, and another where the block was replicated) that has
 the blocks irrespective of block interval, unless non-local scheduling
 kicks in (as you observed next).


- Having bigger blockinterval means bigger blocks. A high value of
spark.locality.wait increases the chance of processing a block on the 
 local
node. A balance needs to be found out between these two parameters to
ensure that the bigger blocks are processed locally.
- Instead of relying on batchInterval and blockInterval, you can
define the number of partitions by calling dstream.repartition(n). This
reshuffles the data in RDD randomly to create n number of partitions.

 Yes, for greater parallelism. Though comes at the cost of a shuffle.


- An RDD's processing is scheduled by driver's jobscheduler as a job.
At a given point of time only one job is active. So, if one job is
executing the other jobs are queued.


- If you have two dstreams there will be two RDDs formed and there
will be two jobs created which will be scheduled one after the another.


- To avoid this, you can union two dstreams. This will ensure that a
single unionRDD is formed for the two RDDs of the dstreams. This unionRDD
is then considered as a single job. However the partitioning of the RDDs 
 is
not impacted.

 To further clarify, the jobs depend on the number of output operations
 (print, foreachRDD, saveAsXFiles) and the number of RDD actions in those
 output operations.

 dstream1.union(dstream2).foreachRDD { rdd = rdd.count() }// one Spark
 job per batch

 dstream1.union(dstream2).foreachRDD { rdd = { rdd.count() ; rdd.count() }
 }// TWO Spark jobs per batch

 dstream1.foreachRDD { rdd = rdd.count } ; dstream2.foreachRDD { rdd =
 rdd.count }  // TWO Spark jobs per batch






-
- If the batch processing time is more than batchinterval then
obviously the receiver's memory will start filling up and will end up in
throwing exceptions (most probably BlockNotFoundException). Currently 
 there
is  no way to pause the receiver.

 You can limit the rate of receiver using SparkConf config
 spark.streaming.receiver.maxRate


-
- For being fully fault tolerant, spark streaming needs to enable
checkpointing. Checkpointing increases the batch processing time.

 Incomplete. There are two types of checkpointing - data and metadata.
 Only data checkpointing, needed by only some operations, increase batch
 processing time. Read -
 http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing
 Furthemore, with checkpoint you can recover computation, but you

Re: Spark Streaming - Design considerations/Knobs

2015-05-21 Thread Hemant Bhanawat
Honestly, given the length of my email, I didn't expect a reply. :-) Thanks
for reading and replying. However, I have a follow-up question:

I don't think if I understand the block replication completely. Are the
blocks replicated immediately after they are received by the receiver? Or
are they kept on the receiver node only and are moved only on shuffle? Has
the replication something to do with locality.wait?

Thanks,
Hemant

On Thu, May 21, 2015 at 2:21 AM, Tathagata Das t...@databricks.com wrote:

 Correcting the ones that are incorrect or incomplete. BUT this is good
 list for things to remember about Spark Streaming.


 On Wed, May 20, 2015 at 3:40 AM, Hemant Bhanawat hemant9...@gmail.com
 wrote:

 Hi,

 I have compiled a list (from online sources) of knobs/design
 considerations that need to be taken care of by applications running on
 spark streaming. Is my understanding correct?  Any other important design
 consideration that I should take care of?


- A DStream is associated with a single receiver. For attaining read
parallelism multiple receivers i.e. multiple DStreams need to be created.
- A receiver is run within an executor. It occupies one core. Ensure
that there are enough cores for processing after receiver slots are booked
i.e. spark.cores.max should take the receiver slots into account.
- The receivers are allocated to executors in a round robin fashion.
- When data is received from a stream source, receiver creates blocks
of data.  A new block of data is generated every blockInterval
milliseconds. N blocks of data are created during the batchInterval where 
 N
= batchInterval/blockInterval.
- These blocks are distributed by the BlockManager of the current
executor to the block managers of other executors. After that, the Network
Input Tracker running on the driver is informed about the block locations
for further processing.
- A RDD is created on the driver for the blocks created during the
batchInterval. The blocks generated during the batchInterval are 
 partitions
of the RDD. Each partition is a task in spark. blockInterval==
batchinterval would mean that a single partition is created and probably 
 it
is processed locally.

 The map tasks on the blocks are processed in the executors (one that
 received the block, and another where the block was replicated) that has
 the blocks irrespective of block interval, unless non-local scheduling
 kicks in (as you observed next).


- Having bigger blockinterval means bigger blocks. A high value of
spark.locality.wait increases the chance of processing a block on the 
 local
node. A balance needs to be found out between these two parameters to
ensure that the bigger blocks are processed locally.
- Instead of relying on batchInterval and blockInterval, you can
define the number of partitions by calling dstream.repartition(n). This
reshuffles the data in RDD randomly to create n number of partitions.

 Yes, for greater parallelism. Though comes at the cost of a shuffle.


- An RDD's processing is scheduled by driver's jobscheduler as a job.
At a given point of time only one job is active. So, if one job is
executing the other jobs are queued.


- If you have two dstreams there will be two RDDs formed and there
will be two jobs created which will be scheduled one after the another.


- To avoid this, you can union two dstreams. This will ensure that a
single unionRDD is formed for the two RDDs of the dstreams. This unionRDD
is then considered as a single job. However the partitioning of the RDDs 
 is
not impacted.

 To further clarify, the jobs depend on the number of output operations
 (print, foreachRDD, saveAsXFiles) and the number of RDD actions in those
 output operations.

 dstream1.union(dstream2).foreachRDD { rdd = rdd.count() }// one Spark
 job per batch

 dstream1.union(dstream2).foreachRDD { rdd = { rdd.count() ; rdd.count() }
 }// TWO Spark jobs per batch

 dstream1.foreachRDD { rdd = rdd.count } ; dstream2.foreachRDD { rdd =
 rdd.count }  // TWO Spark jobs per batch






-
- If the batch processing time is more than batchinterval then
obviously the receiver's memory will start filling up and will end up in
throwing exceptions (most probably BlockNotFoundException). Currently 
 there
is  no way to pause the receiver.

 You can limit the rate of receiver using SparkConf config
 spark.streaming.receiver.maxRate


-
- For being fully fault tolerant, spark streaming needs to enable
checkpointing. Checkpointing increases the batch processing time.

 Incomplete. There are two types of checkpointing - data and metadata.
 Only data checkpointing, needed by only some operations, increase batch
 processing time. Read -
 http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing
 Furthemore, with checkpoint you can recover computation, but you

Spark Streaming - Design considerations/Knobs

2015-05-20 Thread Hemant Bhanawat
Hi,

I have compiled a list (from online sources) of knobs/design considerations
that need to be taken care of by applications running on spark streaming.
Is my understanding correct?  Any other important design consideration that
I should take care of?


   - A DStream is associated with a single receiver. For attaining read
   parallelism multiple receivers i.e. multiple DStreams need to be created.
   - A receiver is run within an executor. It occupies one core. Ensure
   that there are enough cores for processing after receiver slots are booked
   i.e. spark.cores.max should take the receiver slots into account.
   - The receivers are allocated to executors in a round robin fashion.
   - When data is received from a stream source, receiver creates blocks of
   data.  A new block of data is generated every blockInterval milliseconds. N
   blocks of data are created during the batchInterval where N =
   batchInterval/blockInterval.
   - These blocks are distributed by the BlockManager of the current
   executor to the block managers of other executors. After that, the Network
   Input Tracker running on the driver is informed about the block locations
   for further processing.
   - A RDD is created on the driver for the blocks created during the
   batchInterval. The blocks generated during the batchInterval are partitions
   of the RDD. Each partition is a task in spark. blockInterval==
   batchinterval would mean that a single partition is created and probably it
   is processed locally.
   - Having bigger blockinterval means bigger blocks. A high value of
   spark.locality.wait increases the chance of processing a block on the local
   node. A balance needs to be found out between these two parameters to
   ensure that the bigger blocks are processed locally.
   - Instead of relying on batchInterval and blockInterval, you can define
   the number of partitions by calling dstream.repartition(n). This reshuffles
   the data in RDD randomly to create n number of partitions.
   - An RDD's processing is scheduled by driver's jobscheduler as a job. At
   a given point of time only one job is active. So, if one job is executing
   the other jobs are queued.
   - If you have two dstreams there will be two RDDs formed and there will
   be two jobs created which will be scheduled one after the another.
   - To avoid this, you can union two dstreams. This will ensure that a
   single unionRDD is formed for the two RDDs of the dstreams. This unionRDD
   is then considered as a single job. However the partitioning of the RDDs is
   not impacted.
   - If the batch processing time is more than batchinterval then obviously
   the receiver's memory will start filling up and will end up in throwing
   exceptions (most probably BlockNotFoundException). Currently there is  no
   way to pause the receiver.
   - For being fully fault tolerant, spark streaming needs to enable
   checkpointing. Checkpointing increases the batch processing time.
   - The frequency of metadata checkpoint cleaning can be controlled using
   spark.cleaner.ttl. But, data checkpoint cleaning happens automatically when
   the RDDs in the checkpoint are no more required.



Thanks,
Hemant


Spark Streaming - Design considerations/Knobs

2015-05-20 Thread Hemant Bhanawat
Hi,

I have compiled a list (from online sources) of knobs/design considerations
that need to be taken care of by applications running on spark streaming.
Is my understanding correct?  Any other important design consideration that
I should take care of?


   - A DStream is associated with a single receiver. For attaining read
   parallelism multiple receivers i.e. multiple DStreams need to be created.
   - A receiver is run within an executor. It occupies one core. Ensure
   that there are enough cores for processing after receiver slots are booked
   i.e. spark.cores.max should take the receiver slots into account.
   - The receivers are allocated to executors in a round robin fashion.
   - When data is received from a stream source, receiver creates blocks of
   data.  A new block of data is generated every blockInterval milliseconds. N
   blocks of data are created during the batchInterval where N =
   batchInterval/blockInterval.
   - These blocks are distributed by the BlockManager of the current
   executor to the block managers of other executors. After that, the Network
   Input Tracker running on the driver is informed about the block locations
   for further processing.
   - A RDD is created on the driver for the blocks created during the
   batchInterval. The blocks generated during the batchInterval are partitions
   of the RDD. Each partition is a task in spark. blockInterval==
   batchinterval would mean that a single partition is created and probably it
   is processed locally.
   - Having bigger blockinterval means bigger blocks. A high value of
   spark.locality.wait increases the chance of processing a block on the local
   node. A balance needs to be found out between these two parameters to
   ensure that the bigger blocks are processed locally.
   - Instead of relying on batchInterval and blockInterval, you can define
   the number of partitions by calling dstream.repartition(n). This reshuffles
   the data in RDD randomly to create n number of partitions.
   - An RDD's processing is scheduled by driver's jobscheduler as a job. At
   a given point of time only one job is active. So, if one job is executing
   the other jobs are queued.
   - If you have two dstreams there will be two RDDs formed and there will
   be two jobs created which will be scheduled one after the another.
   - To avoid this, you can union two dstreams. This will ensure that a
   single unionRDD is formed for the two RDDs of the dstreams. This unionRDD
   is then considered as a single job. However the partitioning of the RDDs is
   not impacted.
   - If the batch processing time is more than batchinterval then obviously
   the receiver's memory will start filling up and will end up in throwing
   exceptions (most probably BlockNotFoundException). Currently there is  no
   way to pause the receiver.
   - For being fully fault tolerant, spark streaming needs to enable
   checkpointing. Checkpointing increases the batch processing time.
   - The frequency of metadata checkpoint cleaning can be controlled using
   spark.cleaner.ttl. But, data checkpoint cleaning happens automatically when
   the RDDs in the checkpoint are no more required.



Thanks,
Hemant


Re: Regarding hsync

2013-07-11 Thread Hemant Bhanawat
Hi, 

Any help? 

Thanks in advance, 
Hemant 

- Original Message -

From: Hemant Bhanawat hema...@vmware.com 
To: hdfs-dev@hadoop.apache.org 
Sent: Tuesday, July 9, 2013 12:55:23 PM 
Subject: Regarding hsync 

Hi, 

I am currently working on hadoop version 2.0.*. 

Currently, hsync does not update the file size on namenode. So, if my process 
dies after calling hsync but before calling file close, the file is left with 
an inconsistent file size. I would like to fix this file size. Is there a way 
to do that? A workaround that I have come across is to open the file stream in 
append mode and close it. This fixes the file size on the namenode. Is it a 
reliable solution? 

Thanks, 
Hemant 



Regarding hsync

2013-07-09 Thread Hemant Bhanawat
Hi, 

I am currently working on hadoop version 2.0.*. 

Currently, hsync does not update the file size on namenode. So, if my process 
dies after calling hsync but before calling file close, the file is left with 
an inconsistent file size. I would like to fix this file size. Is there a way 
to do that? A workaround that I have come across is to open the file stream in 
append mode and close it. This fixes the file size on the namenode. Is it a 
reliable solution? 

Thanks, 
Hemant 


Partially written SequenceFile

2013-07-04 Thread Hemant Bhanawat


Hi, 

I am working on 2.0.2-alpha version of Hadoop. I am currently writing my key 
value pairs on HDFS in a sequence file. I regularly flush my data using hsync() 
because the process that is writing to the file can terminate abruptly. My 
requirement is that once my hsync() is successful, my data that was written 
before hsync() should still be available. 

To ensure this, I carried out a test that killed the process (that was writing 
to a SequenceFile) after this process did a hsync(). Now when I read the data 
using hadoop fs -cat command, I can see the data. But the size of file is 0. 
Also, SequenceFile.Reader.next(key, value) returns me false. I read somewhere 
that since file was not closed properly its size was not updated with the 
namenode and because of the same reason next() returns false. 

To fix this and to enable reading of file using SequenceFile APIs, I opened the 
file stream in append mode and then I closed it immediately. This fixed the 
size of the file. While doing this, I retry if I receive RecoveryInProgress or 
AlreadyBeingCreated exception. Now, I can successfully read data using 
SequenceFile.Reader. Following is the code that I am using. 


*** WRITE THREAD *** 

writer = SequenceFile.createWriter(fs, conf, path, value.getClass(), 
value.getClass(), CompressionType.NONE); 
writer.append(new Text(India), new Text(Delhi)); 
writer.append(new Text(China), new Text(Beijing)); 
writer.hsync(); 
// BOOM, FAILURE, PROCESS TERMINATED 

*** I expect that India and China Should be available but next returns false*** 

*** Code to fix the file size  

while (true) { 
try { 
FileSystem fs = FileSystem.get(namenodeURI, conf); 
Path path = new Path( uri); 
FSDataOutputStream open = fs.append(path); 
fs.close(); 
break; 
} catch (Recovery In Progress Exception) { 
} catch (Already Being Created Exception) { 
} catch (Exception) { 
break; 
} 

} 


Would it be possible for you to let me know if this approach has any 
shortcomings or if there are any other better alternatives available? 

Thanks, 
Hemant Bhanawat 


HBase and MapReduce

2012-05-23 Thread Hemant Bhanawat
I have couple of questions related to MapReduce over HBase

 

1. HBase guarantees data locality of store files and Regionserver only if
it stays up for long. If there are too many region movements or the server
has been recycled recently, there is a high probability that store file
blocks are not local to the region server.  But the getSplits command
always return the RegionServer of the StoreFile. So in this scenario,
MapReduce loses its data locality? 

 

2. As the getSplits return only the RegionServer, the MR job is not aware
of the multiple replicates of the StoreFile block. It only accesses one
block (which is local if the point above is not applicable). This can
constrain the MR processing as you cannot distribute the data processing
in the best possible manner. Is this correct? 

 

3. A guess - since the MR processing goes through the RegionServer, it may
impact the RegionServer performance for other random operations? 

 

Thanks in advance,

Hemant