Unsubscribe

2023-12-12 Thread Sergey Boytsov
Unsubscribe

--


unsubscribe

2023-12-11 Thread Sergey Boytsov
-- 

Sergei Boitsov

JetBrains GmbH
Christoph-Rapparini-Bogen 23
80639 München
Handelsregister: Amtsgericht München, HRB 187151
Geschäftsführer: Yury Belyaev


Re: [Building] Building with JDK11

2022-07-17 Thread Sergey B.
Hi Steve,

Can you shed some light why do they need $JAVA_HOME at all if everything is
already in place?

Regards,
- Sergey

On Mon, Jul 18, 2022 at 4:31 AM Stephen Coy 
wrote:

> Hi Szymon,
>
> There seems to be a common misconception that setting JAVA_HOME will set
> the version of Java that is used.
>
> This is not true, because in most environments you need to have a PATH
> environment variable set up that points at the version of Java that you
> want to use.
>
> You can set JAVA_HOME to anything at all and `java -version` will always
> return the same result.
>
> The way that you configure PATH varies from OS to OS:
>
>
>- MacOS use `/usr/libexec/java_home -v11`
>- On linux use `sudo alternatives --config java`
>- On Windows I have no idea
>
>
> When you do this the `mvn` command will compute the value of JAVA_HOME for
> its own use; there is no need to explicitly set it yourself (unless other
> Java applications that you use insist u[on it).
>
>
> Cheers,
>
> Steve C
>
> On 16 Jul 2022, at 7:24 am, Szymon Kuryło  wrote:
>
> Hello,
>
> I'm trying to build a Java 11 Spark distro using the
> dev/make-distribution.sh script.
> I have set JAVA_HOME to point to JDK11 location, I've also set the
> java.version property in pom.xml to 11, effectively also setting
> `maven.compile.source` and `maven.compile.target`.
> When inspecting classes from the `dist` directory with `javap -v`, I find
> that the class major version is 52, which is specific to JDK8. Am I missing
> something? Is there a reliable way to set the JDK used in the build process?
>
> Thanks,
> Szymon K.
>
>
> This email contains confidential information of and is the copyright of
> Infomedia. It must not be forwarded, amended or disclosed without consent
> of the sender. If you received this message by mistake, please advise the
> sender and delete all copies. Security of transmission on the internet
> cannot be guaranteed, could be infected, intercepted, or corrupted and you
> should ensure you have suitable antivirus protection in place. By sending
> us your or any third party personal details, you consent to (or confirm you
> have obtained consent from such third parties) to Infomedia’s privacy
> policy. http://www.infomedia.com.au/privacy-policy/
>


Re: Spark sql slowness in Spark 3.0.1

2022-04-14 Thread Sergey B.
The suggestion is to check:

1. Used format for write
2. Used parallelism

On Thu, Apr 14, 2022 at 7:13 PM Anil Dasari  wrote:

> Hello,
>
>
>
> We are upgrading spark from 2.4.7 to 3.0.1. we use spark sql (hive) to
> checkpoint data frames (intermediate data). DF write is very slow in 3.0.1
> compared to 2.4.7.
>
> Have read the release notes and there were no major changes except managed
> tables and adaptive scheduling. We are not using adaptive scheduling and
> going with default config. We made changes to handle managed tables by
> adding explicit paths during writes and delete.
>
>
>
> Do you have any suggestions to debug and fix the slowness problem ?
>
>
>
> Thanks,
>
>
>


Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-12 Thread Sergey Ivanychev
Hi Sean,

According to the plan I’m observing, this is what happens indeed. There’s 
exchange operation that sends data to a single partition/task in toPandas() + 
PyArrow enabled case.

> 12 нояб. 2021 г., в 16:31, Sean Owen  написал(а):
> 
> Yes, none of the responses are addressing your question.
> I do not think it's a bug necessarily; do you end up with one partition in 
> your execution somewhere?
> 
> On Fri, Nov 12, 2021 at 3:38 AM Sergey Ivanychev  <mailto:sergeyivanyc...@gmail.com>> wrote:
> Of course if I give 64G of ram to each executor they will work. But what’s 
> the point? Collecting results in the driver should cause a high RAM usage in 
> the driver and that’s what is happening in collect() case. In the case where 
> pyarrow serialization is enabled all the data is being collected on a single 
> executor, which is clearly a wrong way to collect the result on the driver.
> 
> I guess I’ll open an issue about it in Spark Jira. It clearly looks like a 
> bug.
> 
>> 12 нояб. 2021 г., в 11:59, Mich Talebzadeh > <mailto:mich.talebza...@gmail.com>> написал(а):
>> 
>> OK, your findings do not imply those settings are incorrect. Those settings 
>> will work if you set-up your k8s cluster in peer-to-peer mode with equal 
>> amounts of RAM for each node which is common practice.
>> 
>> HTH
>> 
>> 
>>view my Linkedin profile 
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>  
>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>> loss, damage or destruction of data or any other property which may arise 
>> from relying on this email's technical content is explicitly disclaimed. The 
>> author will in no case be liable for any monetary damages arising from such 
>> loss, damage or destruction.
>>  
>> 
>> 
>> On Thu, 11 Nov 2021 at 21:39, Sergey Ivanychev > <mailto:sergeyivanyc...@gmail.com>> wrote:
>> Yes, in fact those are the settings that cause this behaviour. If set to 
>> false, everything goes fine since the implementation in spark sources in 
>> this case is
>> 
>> pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
>> 
>> Best regards,
>> 
>> 
>> Sergey Ivanychev
>> 
>>> 11 нояб. 2021 г., в 13:58, Mich Talebzadeh >> <mailto:mich.talebza...@gmail.com>> написал(а):
>>> 
>>> 
>>> Have you tried the following settings:
>>> 
>>> spark.conf.set("spark.sql.execution.arrow.pysppark.enabled", "true")
>>> spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled","true")
>>> 
>>> HTH
>>> 
>>>view my Linkedin profile 
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>  
>>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>>> loss, damage or destruction of data or any other property which may arise 
>>> from relying on this email's technical content is explicitly disclaimed. 
>>> The author will in no case be liable for any monetary damages arising from 
>>> such loss, damage or destruction.
>>>  
>>> 
>>> 
>>> On Thu, 4 Nov 2021 at 18:06, Mich Talebzadeh >> <mailto:mich.talebza...@gmail.com>> wrote:
>>> Ok so it boils down on how spark does create toPandas() DF under the 
>>> bonnet. How many executors are involved in k8s cluster. In this model spark 
>>> will create executors = no of nodes - 1
>>> 
>>> On Thu, 4 Nov 2021 at 17:42, Sergey Ivanychev >> <mailto:sergeyivanyc...@gmail.com>> wrote:
>>> > Just to confirm with Collect() alone, this is all on the driver?
>>> 
>>> I shared the screenshot with the plan in the first email. In the collect() 
>>> case the data gets fetched to the driver without problems.
>>> 
>>> Best regards,
>>> 
>>> 
>>> Sergey Ivanychev
>>> 
>>>> 4 нояб. 2021 г., в 20:37, Mich Talebzadeh >>> <mailto:mich.talebza...@gmail.com>> написал(а):
>>>> 
>>> 
>>>> Just to confirm with Collect() alone, this is all on the driver?
>>> -- 
>>> 
>>> 
>>>view my Linkedin profile 
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>  
>>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>>> loss, damage or destruction of data or any other property which may arise 
>>> from relying on this email's technical content is explicitly disclaimed. 
>>> The author will in no case be liable for any monetary damages arising from 
>>> such loss, damage or destruction.
>>>  
> 



Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-12 Thread Sergey Ivanychev
Of course if I give 64G of ram to each executor they will work. But what’s the 
point? Collecting results in the driver should cause a high RAM usage in the 
driver and that’s what is happening in collect() case. In the case where 
pyarrow serialization is enabled all the data is being collected on a single 
executor, which is clearly a wrong way to collect the result on the driver.

I guess I’ll open an issue about it in Spark Jira. It clearly looks like a bug.

> 12 нояб. 2021 г., в 11:59, Mich Talebzadeh  
> написал(а):
> 
> OK, your findings do not imply those settings are incorrect. Those settings 
> will work if you set-up your k8s cluster in peer-to-peer mode with equal 
> amounts of RAM for each node which is common practice.
> 
> HTH
> 
> 
>view my Linkedin profile 
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>  
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> 
> On Thu, 11 Nov 2021 at 21:39, Sergey Ivanychev  <mailto:sergeyivanyc...@gmail.com>> wrote:
> Yes, in fact those are the settings that cause this behaviour. If set to 
> false, everything goes fine since the implementation in spark sources in this 
> case is
> 
> pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
> 
> Best regards,
> 
> 
> Sergey Ivanychev
> 
>> 11 нояб. 2021 г., в 13:58, Mich Talebzadeh > <mailto:mich.talebza...@gmail.com>> написал(а):
>> 
>> 
>> Have you tried the following settings:
>> 
>> spark.conf.set("spark.sql.execution.arrow.pysppark.enabled", "true")
>> spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled","true")
>> 
>> HTH
>> 
>>view my Linkedin profile 
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>  
>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>> loss, damage or destruction of data or any other property which may arise 
>> from relying on this email's technical content is explicitly disclaimed. The 
>> author will in no case be liable for any monetary damages arising from such 
>> loss, damage or destruction. 
>>  
>> 
>> 
>> On Thu, 4 Nov 2021 at 18:06, Mich Talebzadeh > <mailto:mich.talebza...@gmail.com>> wrote:
>> Ok so it boils down on how spark does create toPandas() DF under the bonnet. 
>> How many executors are involved in k8s cluster. In this model spark will 
>> create executors = no of nodes - 1
>> 
>> On Thu, 4 Nov 2021 at 17:42, Sergey Ivanychev > <mailto:sergeyivanyc...@gmail.com>> wrote:
>> > Just to confirm with Collect() alone, this is all on the driver?
>> 
>> I shared the screenshot with the plan in the first email. In the collect() 
>> case the data gets fetched to the driver without problems.
>> 
>> Best regards,
>> 
>> 
>> Sergey Ivanychev
>> 
>>> 4 нояб. 2021 г., в 20:37, Mich Talebzadeh >> <mailto:mich.talebza...@gmail.com>> написал(а):
>>> 
>> 
>>> Just to confirm with Collect() alone, this is all on the driver?
>> -- 
>> 
>> 
>>view my Linkedin profile 
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>  
>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>> loss, damage or destruction of data or any other property which may arise 
>> from relying on this email's technical content is explicitly disclaimed. The 
>> author will in no case be liable for any monetary damages arising from such 
>> loss, damage or destruction.
>>  



Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-11 Thread Sergey Ivanychev
Hi Gourav,

Please, read my question thoroughly. My problem is with the plan of the 
execution and with the fact that toPandas collects all the data not on the 
driver but on an executor, not with the fact that there’s some memory overhead.

I don’t understand how your excerpts answer my question. The chapters you’ve 
shared describe that serialization is costly, that workers can fail due to the 
memory constraints and inter-language serialization.

This is irrelevant to my question — building pandas DataFrame using Spark’s 
collect() works fine and this operation itself involves much deserialization of 
Row objects.

Best regards,


Sergey Ivanychev

> 12 нояб. 2021 г., в 05:05, Gourav Sengupta  
> написал(а):
> 
> Hi Sergey,
> 
> Please read the excerpts from the book of Dr. Zaharia that I had sent, they 
> explain these fundamentals clearly.
> 
> Regards,
> Gourav Sengupta
> 
> On Thu, Nov 11, 2021 at 9:40 PM Sergey Ivanychev  
> wrote:
>> Yes, in fact those are the settings that cause this behaviour. If set to 
>> false, everything goes fine since the implementation in spark sources in 
>> this case is
>> 
>> pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
>> 
>> Best regards,
>> 
>> 
>> Sergey Ivanychev
>> 
>>> 11 нояб. 2021 г., в 13:58, Mich Talebzadeh  
>>> написал(а):
>>> 
>>> Have you tried the following settings:
>>> 
>>> spark.conf.set("spark.sql.execution.arrow.pysppark.enabled", "true")
>>> spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled","true")
>>> 
>>> HTH
>>> 
>>>view my Linkedin profile
>>> 
>>>  
>>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>>> loss, damage or destruction of data or any other property which may arise 
>>> from relying on this email's technical content is explicitly disclaimed. 
>>> The author will in no case be liable for any monetary damages arising from 
>>> such loss, damage or destruction.
>>>  
>>> 
>>> 
>>> On Thu, 4 Nov 2021 at 18:06, Mich Talebzadeh  
>>> wrote:
>>>> Ok so it boils down on how spark does create toPandas() DF under the 
>>>> bonnet. How many executors are involved in k8s cluster. In this model 
>>>> spark will create executors = no of nodes - 1
>>>> 
>>>> On Thu, 4 Nov 2021 at 17:42, Sergey Ivanychev  
>>>> wrote:
>>>>> > Just to confirm with Collect() alone, this is all on the driver?
>>>>> 
>>>>> I shared the screenshot with the plan in the first email. In the 
>>>>> collect() case the data gets fetched to the driver without problems.
>>>>> 
>>>>> Best regards,
>>>>> 
>>>>> 
>>>>> Sergey Ivanychev
>>>>> 
>>>>>> 4 нояб. 2021 г., в 20:37, Mich Talebzadeh  
>>>>>> написал(а):
>>>>> 
>>>>>> Just to confirm with Collect() alone, this is all on the driver?
>>>> -- 
>>>> 
>>>> 
>>>>view my Linkedin profile
>>>> 
>>>>  
>>>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>>>> loss, damage or destruction of data or any other property which may arise 
>>>> from relying on this email's technical content is explicitly disclaimed. 
>>>> The author will in no case be liable for any monetary damages arising from 
>>>> such loss, damage or destruction.


Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-11 Thread Sergey Ivanychev
Yes, in fact those are the settings that cause this behaviour. If set to false, 
everything goes fine since the implementation in spark sources in this case is

pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)

Best regards,


Sergey Ivanychev

> 11 нояб. 2021 г., в 13:58, Mich Talebzadeh  
> написал(а):
> 
> 
> Have you tried the following settings:
> 
> spark.conf.set("spark.sql.execution.arrow.pysppark.enabled", "true")
> spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled","true")
> 
> HTH
> 
>view my Linkedin profile
> 
>  
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> 
>> On Thu, 4 Nov 2021 at 18:06, Mich Talebzadeh  
>> wrote:
>> Ok so it boils down on how spark does create toPandas() DF under the bonnet. 
>> How many executors are involved in k8s cluster. In this model spark will 
>> create executors = no of nodes - 1
>> 
>>> On Thu, 4 Nov 2021 at 17:42, Sergey Ivanychev  
>>> wrote:
>>> > Just to confirm with Collect() alone, this is all on the driver?
>>> 
>>> I shared the screenshot with the plan in the first email. In the collect() 
>>> case the data gets fetched to the driver without problems.
>>> 
>>> Best regards,
>>> 
>>> 
>>> Sergey Ivanychev
>>> 
>>>> 4 нояб. 2021 г., в 20:37, Mich Talebzadeh  
>>>> написал(а):
>>>> 
>>> 
>>>> Just to confirm with Collect() alone, this is all on the driver?
>> -- 
>> 
>> 
>>view my Linkedin profile
>> 
>>  
>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>> loss, damage or destruction of data or any other property which may arise 
>> from relying on this email's technical content is explicitly disclaimed. The 
>> author will in no case be liable for any monetary damages arising from such 
>> loss, damage or destruction.
>>  


Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-04 Thread Sergey Ivanychev
> did you get to read the excerpts from the book of Dr. Zaharia?

I read what you have shared but didn’t manage to get your point.

Best regards,


Sergey Ivanychev

> 4 нояб. 2021 г., в 20:38, Gourav Sengupta  
> написал(а):
> 
> did you get to read the excerpts from the book of Dr. Zaharia?


Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-04 Thread Sergey Ivanychev
> Just to confirm with Collect() alone, this is all on the driver?

I shared the screenshot with the plan in the first email. In the collect() case 
the data gets fetched to the driver without problems.

Best regards,


Sergey Ivanychev

> 4 нояб. 2021 г., в 20:37, Mich Talebzadeh  
> написал(а):
> 
> Just to confirm with Collect() alone, this is all on the driver?


Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-04 Thread Sergey Ivanychev
I’m sure that its running in client mode. I don’t want to have the same amount 
of RAM on drivers and executors since there’s no point in giving 64G of ram to 
executors in my case.

My question is why collect and toPandas actions produce so different plans, 
which cause toPandas to fail on executors.

Best regards,


Sergey Ivanychev

> 4 нояб. 2021 г., в 15:17, Mich Talebzadeh  
> написал(а):
> 
> 
> 
> From your notes ".. IIUC, in the `toPandas` case all the data gets shuffled 
> to a single executor that fails with OOM, which doesn’t happen in `collect` 
> case. This does it work like that? How do I collect a large dataset that fits 
> into memory of the driver?.
> 
> The acid test would be to use pandas and ensure that all nodes have the same 
> amount of RAM. The assumption here is that the master node has a larger 
> amount of RAM that in theory should handle the work. for Jupiter with Pandas. 
> You can easily find out which mode Spark is deploying by looking at Spark GUI 
> page.
> 
> HTH
> 
> 
> 
>view my Linkedin profile
> 
>  
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> 
>> On Thu, 4 Nov 2021 at 11:25, Sergey Ivanychev  
>> wrote:
>> I will follow up with the output, but I suppose Jupyter runs in client mode 
>> since it’s created via getOrCreate with a K8s api server as master.
>> Also note that I tried both “collect” and “toPandas” in the same conditions 
>> (Jupyter client mode), so IIUC your theory doesn’t explain that difference 
>> in execution plans.
>> 
>> Best regards,
>> 
>> 
>> Sergey Ivanychev
>> 
>>>> 4 нояб. 2021 г., в 13:12, Mich Talebzadeh  
>>>> написал(а):
>>>> 
>>> 
>>> Do you have the output for executors from spark GUI, the one that 
>>> eventually ends up with OOM?
>>> 
>>> Also what does 
>>> 
>>> kubectl get pods -n $NAMESPACE 
>>> DRIVER_POD_NAME=`kubectl get pods -n $NAMESPACE |grep driver|awk '{print 
>>> $1}'`
>>> kubectl logs $DRIVER_POD_NAME -n $NAMESPACE
>>> kubectl logs $EXECUTOR_WITH_OOM  -n $NAMESPACE
>>> 
>>> say?
>>> 
>>> My guess is that Jupyter notebook like Zeppelin notebook does a two stage 
>>> spark-submit under the bonnet. The job starts on the driver where the 
>>> Jupyter notebook is on but the actual job runs on the cluster itself in 
>>> cluster mode. If your assertion is right (executors don't play much of a 
>>> role), just run the whole thing in local mode!
>>> 
>>> HTH
>>> 
>>>    view my Linkedin profile
>>> 
>>>  
>>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>>> loss, damage or destruction of data or any other property which may arise 
>>> from relying on this email's technical content is explicitly disclaimed. 
>>> The author will in no case be liable for any monetary damages arising from 
>>> such loss, damage or destruction.
>>>  
>>> 
>>> 
>>>> On Wed, 3 Nov 2021 at 22:08, Sergey Ivanychev  
>>>> wrote:
>>>> I want to further clarify the use case I have: an ML engineer collects 
>>>> data so as to use it for training an ML model. The driver is created 
>>>> within Jupiter notebook and has 64G of ram for fetching the training set 
>>>> and feeding it to the model. Naturally, in this case executors shouldn’t 
>>>> be as big as the driver.
>>>> 
>>>> Currently, the best solution I found is to write the dataframe to S3, and 
>>>> then read it via pd.read_parquet.
>>>> 
>>>> Best regards,
>>>> 
>>>> 
>>>> Sergey Ivanychev
>>>> 
>>>>>> 4 нояб. 2021 г., в 00:18, Mich Talebzadeh  
>>>>>> написал(а):
>>>>>> 
>>>>> 
>>>>> Thanks for clarification on the koalas case.
>>>>> 
>>>>> The thread owner states and I quote: .. IIUC, in the `toPandas` case all 
>>>>> the data gets shuffled to a single executor that fails with OOM, 
>>>>> 
>>>>> I still believe that this may be related to the way k8s handles 
>>>>> shuffling. In a 

Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-04 Thread Sergey Ivanychev
I will follow up with the output, but I suppose Jupyter runs in client mode 
since it’s created via getOrCreate with a K8s api server as master.
Also note that I tried both “collect” and “toPandas” in the same conditions 
(Jupyter client mode), so IIUC your theory doesn’t explain that difference in 
execution plans.

Best regards,


Sergey Ivanychev

> 4 нояб. 2021 г., в 13:12, Mich Talebzadeh  
> написал(а):
> 
> 
> Do you have the output for executors from spark GUI, the one that eventually 
> ends up with OOM?
> 
> Also what does 
> 
> kubectl get pods -n $NAMESPACE 
> DRIVER_POD_NAME=`kubectl get pods -n $NAMESPACE |grep driver|awk '{print $1}'`
> kubectl logs $DRIVER_POD_NAME -n $NAMESPACE
> kubectl logs $EXECUTOR_WITH_OOM  -n $NAMESPACE
> 
> say?
> 
> My guess is that Jupyter notebook like Zeppelin notebook does a two stage 
> spark-submit under the bonnet. The job starts on the driver where the Jupyter 
> notebook is on but the actual job runs on the cluster itself in cluster mode. 
> If your assertion is right (executors don't play much of a role), just run 
> the whole thing in local mode!
> 
> HTH
> 
>view my Linkedin profile
> 
>  
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> 
>> On Wed, 3 Nov 2021 at 22:08, Sergey Ivanychev  
>> wrote:
>> I want to further clarify the use case I have: an ML engineer collects data 
>> so as to use it for training an ML model. The driver is created within 
>> Jupiter notebook and has 64G of ram for fetching the training set and 
>> feeding it to the model. Naturally, in this case executors shouldn’t be as 
>> big as the driver.
>> 
>> Currently, the best solution I found is to write the dataframe to S3, and 
>> then read it via pd.read_parquet.
>> 
>> Best regards,
>> 
>> 
>> Sergey Ivanychev
>> 
>>>> 4 нояб. 2021 г., в 00:18, Mich Talebzadeh  
>>>> написал(а):
>>>> 
>>> 
>>> Thanks for clarification on the koalas case.
>>> 
>>> The thread owner states and I quote: .. IIUC, in the `toPandas` case all 
>>> the data gets shuffled to a single executor that fails with OOM, 
>>> 
>>> I still believe that this may be related to the way k8s handles shuffling. 
>>> In a balanced k8s cluster this could be avoided which does not seem to be 
>>> the case here as the so called driver node has 8 times more RAM than the 
>>> other nodes. 
>>> 
>>> HTH
>>> 
>>>view my Linkedin profile
>>>  
>>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>>> loss, damage or destruction of data or any other property which may arise 
>>> from relying on this email's technical content is explicitly disclaimed. 
>>> The author will in no case be liable for any monetary damages arising from 
>>> such loss, damage or destruction.
>>>  
>>> 
>>> 
>>>> On Wed, 3 Nov 2021 at 21:00, Sean Owen  wrote:
>>>> I think you're talking about koalas, which is in Spark 3.2, but that is 
>>>> unrelated to toPandas(), nor to the question of how it differs from 
>>>> collect().
>>>> Shuffle is also unrelated.
>>>> 
>>>>> On Wed, Nov 3, 2021 at 3:45 PM Mich Talebzadeh 
>>>>>  wrote:
>>>>> Hi,
>>>>> 
>>>>> As I understood in the previous versions of Spark the data could not be 
>>>>> processed and stored in Pandas data frames in a distributed mode as these 
>>>>> data frames store data in RAM which is the driver in this case. 
>>>>> However, I was under the impression that this limitation no longer exists 
>>>>> in 3.2? So if you have a k8s cluster with 64GB of RAM for one node and 
>>>>> 8GB of RAM for others, and PySpark running in cluster mode,  how do you 
>>>>> expect the process to confine itself to the master node? What will happen 
>>>>> if you increase executor node(s) RAM to 64GB temporarily (balanced k8s 
>>>>> cluster) and run the job again?
>>>>> 
>>>>> Worth noting that the current Spark on k8s  does not support external 
>>>>> shuffle. For now we have two parameters for Dynamic Resource All

Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-03 Thread Sergey Ivanychev
I want to further clarify the use case I have: an ML engineer collects data so 
as to use it for training an ML model. The driver is created within Jupiter 
notebook and has 64G of ram for fetching the training set and feeding it to the 
model. Naturally, in this case executors shouldn’t be as big as the driver.

Currently, the best solution I found is to write the dataframe to S3, and then 
read it via pd.read_parquet.

Best regards,


Sergey Ivanychev

> 4 нояб. 2021 г., в 00:18, Mich Talebzadeh  
> написал(а):
> 
> 
> Thanks for clarification on the koalas case.
> 
> The thread owner states and I quote: .. IIUC, in the `toPandas` case all the 
> data gets shuffled to a single executor that fails with OOM, 
> 
> I still believe that this may be related to the way k8s handles shuffling. In 
> a balanced k8s cluster this could be avoided which does not seem to be the 
> case here as the so called driver node has 8 times more RAM than the other 
> nodes. 
> 
> HTH
> 
>view my Linkedin profile
>  
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> 
>> On Wed, 3 Nov 2021 at 21:00, Sean Owen  wrote:
>> I think you're talking about koalas, which is in Spark 3.2, but that is 
>> unrelated to toPandas(), nor to the question of how it differs from 
>> collect().
>> Shuffle is also unrelated.
>> 
>>> On Wed, Nov 3, 2021 at 3:45 PM Mich Talebzadeh  
>>> wrote:
>>> Hi,
>>> 
>>> As I understood in the previous versions of Spark the data could not be 
>>> processed and stored in Pandas data frames in a distributed mode as these 
>>> data frames store data in RAM which is the driver in this case. 
>>> However, I was under the impression that this limitation no longer exists 
>>> in 3.2? So if you have a k8s cluster with 64GB of RAM for one node and 8GB 
>>> of RAM for others, and PySpark running in cluster mode,  how do you expect 
>>> the process to confine itself to the master node? What will happen if you 
>>> increase executor node(s) RAM to 64GB temporarily (balanced k8s cluster) 
>>> and run the job again?
>>> 
>>> Worth noting that the current Spark on k8s  does not support external 
>>> shuffle. For now we have two parameters for Dynamic Resource Allocation. 
>>> These are 
>>> 
>>>  --conf spark.dynamicAllocation.enabled=true \
>>>  --conf spark.dynamicAllocation.shuffleTracking.enabled=true \
>>>  
>>> The idea is to use dynamic resource allocation where the driver tracks the 
>>> shuffle files and evicts only executors not storing active shuffle files. 
>>> So in a nutshell these shuffle files are stored in the executors themselves 
>>> in the absence of the external shuffle. The model works on the basis of the 
>>> "one-container-per-Pod" model  meaning that for each node of the cluster 
>>> there will be one node running the driver and each remaining node running 
>>> one executor each. 
>>> 
>>> 
>>> HTH
>>> , 
>>>view my Linkedin profile
>>> 
>>>  
>>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>>> loss, damage or destruction of data or any other property which may arise 
>>> from relying on this email's technical content is explicitly disclaimed. 
>>> The author will in no case be liable for any monetary damages arising from 
>>> such loss, damage or destruction.
>>>  
>>> 


Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-03 Thread Sergey Ivanychev
I’m pretty sure

WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 20) (10.20.167.28 executor 
2): java.lang.OutOfMemoryError
at java.base/java.io.ByteArrayOutputStream.hugeCapacity(Unknown Source)
If you look at the «toPandas» you can see the exchange stage that doesn’t occur 
in the «collect» example. I suppose it tries to pull all the data to a single 
executor for some reason.

> 3 нояб. 2021 г., в 21:34, Sean Owen  написал(а):
> 
> No, you can collect() a DataFrame. You get Rows. collect() must create an 
> in-memory representation - how else could it work? Those aren't differences.
> Are you sure it's the executor that's OOM, not the driver? 
> 
> On Wed, Nov 3, 2021 at 1:32 PM Gourav Sengupta  <mailto:gourav.sengu...@gmail.com>> wrote:
> Hi,
> 
> I might be wrong but toPandas() works with dataframes, where as collect works 
> at RDD. Also toPandas() converts to Python objects in memory I do not think 
> that collect does it.
> 
> Regards,
> Gourav
> 
> On Wed, Nov 3, 2021 at 2:24 PM Sergey Ivanychev  <mailto:sergeyivanyc...@gmail.com>> wrote:
> Hi, 
> 
> Spark 3.1.2 K8s.
> 
> I encountered OOM error while trying to create a Pandas DataFrame from Spark 
> DataFrame. My Spark driver has 60G of ram, but the executors are tiny 
> compared to that (8G)
> 
> If I do `spark.table(…).limit(100).collect()` I get the following plan
> 
> <2021-11-03_17-07-55.png>
> 
> 
> If I do `spark.table(…).limit(100).toPandas()` I get a more complicated 
> plan with an extra shuffle
> 
> <2021-11-03_17-08-31.png>
> 
> IIUC, in the `toPandas` case all the data gets shuffled to a single executor 
> that fails with OOM, which doesn’t happen in `collect` case. This does it 
> work like that? How do I collect a large dataset that fits into memory of the 
> driver?



Using Spark as a fail-over platform for Java app

2021-03-12 Thread Sergey Oboguev
I have an existing plain-Java (non-Spark) application that needs to run in
a fault-tolerant way, i.e. if the node crashes then the application is
restarted on another node, and if the application crashes because of
internal fault, the application is restarted too.

Normally I would run it in a Kubernetes, but in this specific case
Kubernetes is unavailable because of organizational/bureaucratic issues,
and the only execution platform available in the domain is Spark.

Is it possible to wrap the application into a Spark-based launcher that
will take care of executing the application and restarts?

Execution must be in a separate JVM, apart from other apps.

And for optimum performance, the application also needs to be assigned
guaranteed resources, i.e. the number of cores and amount of RAM required
for it, so it would be great if the launcher could take care of this too.

Thanks for advice.


Detecting latecomer events in Spark structured streaming

2021-03-08 Thread Sergey Oboguev
I have a Spark structured streaming based application that performs
window(...) construct followed by aggregation.

This construct discards latecomer events that arrive past the watermark. I
need to be able to detect these late events to handle them out-of-band.
The application maintains a raw store of all received events and can
re-aggregate a particular time interval for a particular device in a
secondary batch mode, as long as it knows that this interval has to be
re-aggregated, i.e. contains latecomer data that was discarded by
structured streaming due to watermark.

I am trying to come with a way to perform such a detection.

One approach would perhaps be to insert an additional stage before
window(...) -- a stage that would monitor all events received by the
stream, look at their timestamps, and predict which events will be
discarded by window(...) due to watermark. Such events can then be handled
outside of Spark structured streaming. The stage can be based on
Dataset.foreach, Dataset.filter or Dataset.map that will pass all events
through, but also monitor the events and if a latecomer condition is
detected, then issue a side channel notification that will cause data for
the specified device and interval be re-aggregated later from raw event
storage, out of stream.

I have a couple of questions related to the feasibility of such a construct.

Q1:

Can data behind the window(...) be shared by multiple executors or nodes,
or is it owned by one executor at any given time? If it is shared, it would
appear that local monitoring of passing timestamps would be insufficient,
since it lacks global context.

Q2:

To monitor the stream, the stage needs to maintain a context. The context
can be checkpointed periodically in an external store, but I do not want to
persist/readback the context for every microbatch (or, in the foreach case,
for every individual event). I want to checkpoint the context infrequently,
and maintain it across microbatches just in memory.

Which brings a question... The handler function inside the stage (called by
foreach, map, or filter) needs to refer to the context object, yet it is
unclear how to make such a reference.

I could attach a context to the stream via some global map object
(translating stream->context), but handler functions for Dataset.foreach,
Dataset.map, or Dataset.filter do not receive a stream handle, and thus
have no key to use for translation back to context object.

The association cannot be done via a TLS (per-thread) variable too, since
Spark can internally create threads for stream processing and they won't
inherit the parent TLS (and also may not even have the thread that started
the stream as their parent thread).

This appears to leave Java static variable as the only option for the
context pointer, limiting the model to one active stream per executor. But
is it guaranteed by Spark specification that different executors will run
in different JVM instances?

Thanks for advice.


Controlling Spark StateStore retention

2021-02-20 Thread Sergey Oboguev
 I am trying to write a Spark Structured Streaming application consisting
of GroupState construct followed by aggregation.

Events arriving from event sources are bucketized by deviceId and quantized
timestamp, composed together into group state key idTime.

Logical plan consists of stages (in the order of data flow):

FlatMapGroupsWithState
Aggregate


and translates to physical plan (in the same order)

FlatMapGroupsWithState
SerializeFromObject
Project (idTime, timestamp)
HashAggregate(keys=[idTime] ... partial aggregation)
Exchange hashpartitioning(idTime)
HashAggregate(keys=[idTime] ... merge aggregates)
StateStoreRestore [idTime], state info)
HashAggregate(keys=[idTime] ... merge aggregates)
StateStoreSave [idTime], state info)
HashAggregate(keys=[idTime], functions= ...)


This all works, but it appears that partial aggregate state does not ever
get released.

If I send 10 events for some value of idTime, the stream produces an output
batch with count = 10.

If some significant time later (after group state expires) I send 10 more
events for the some value of idTime, the stream produces another output
batch with count = 20. Other aggregates also reflect that both old and new
events were reflected in this subsequent aggregation output batch.

Thus, it appears state information is not cleared from the state store.

This is nice from the standpoint of handling latecomer events, but also
poses a problem: if partial aggregate information per every idTime value is
never cleared from the state store, the state store eventually is going to
run out of space.

Is there a way to control this retention and trigger the release of state
store data for old values idTime, no longer needed?

Thanks for advice.


Re: Excessive disk IO with Spark structured streaming

2020-10-07 Thread Sergey Oboguev
e snapshot files (only 2
snapshot files in the whole run), and 4 delta files per microbatch.
Of local-dir files: 1 temp_shuffle files per microbatch (as expected) and 2
other files (shuffle.data+shuffle.index).

..

Thus Spark SS seems to keep 4 delta files per microbatch per shuffle
partition, no matter what is the number of shuffle partitions.
Why would it have to do this?

Also unsure why Spark has to create local-dir files per every microbatch,
rather than keeping them open across microbatches and re-using from one
microbatch to another (writing data over, but without having to go through
file creation)...

*> Spark leverages HDFS API which is configured to create crc file per file
by default. *


This is unfortunate. Much better would be to create checkpoint files with
HDFS CRC "shadow file" disabled, and having instead CRC (if desired) right
inside the main file itself, rather than as a separate file.

* * *

While we are at it, I wished to ask a related question. Suppose I create
multiple parallel streaming pipelines in the applications, by pipeline
meaning the whole data stream from initial Dataset/DatasetReader to the
output of Spark query. Suppose I have multiple parallel pipelines in the
application, a large number, let us say dozens or hundreds.

How would Spark process them in terms of threading model? Will there be a
separate thread per active stream/query or does Spark use a bounded thread
pool? Do many streams/queries result in many threads, or a limited number
of threads?

Thanks,
Sergey


Re: Excessive disk IO with Spark structured streaming

2020-10-05 Thread Sergey Oboguev
 Hi Jungtaek,

Thanks for your response.

*> you'd want to dive inside the checkpoint directory and have separate
numbers per top-subdirectory*

All the checkpoint store numbers are solely for the subdirectory set by
option("checkpointLocation", .. checkpoint dir for writer ... )

Other subdirectories are empty or nearly-empty.

*> First of all, you'd want to divide these numbers by the number of
micro-batches, as file creations in checkpoint directory would occur
similarly per micro-batch*

There were 69 microbatches, each containing 13,334 events.

Event's Row object size in Java heap is 620 bytes, thus the total amount of
data in a microbatch (in terms of aggregate Java-heap objects sizes) is 8.3
MB.

Average number of system calls per microbatch was:

For query (writer) checkpoint directory:

create/open file = 877
mkdir = 826
readlink = 855
unlink = 607
rename = 213
execve readlink = 5116
execve chmod = 4937

For Spark local directory:

create/open file = 797
unlink = 200
mmap = 197
stat = 2391

(The number for local.stat in the previous message was incorrect).

Physical processing time per microbatch was 3.4 seconds.

That's to store a mere 8.3 MB of uncompressed (Java-heap) data!

Most created "delta" files have file size in the order of 1 KB or less.
"Snapshot" files are several KB in size.
One would think that the tiny size of created files is one key factor in
dismal performance. It causes a very high number of system calls and also
hugely fragments actual data IO.

As a result, using iostat, typical disk write rate was observed only ~ 100
KB/s.
(Read rate was near-zero, presumably because all data was in Linux block
cache.)

Average CPU usage when ingesting data was in the order of 600% (i.e. 6
cores busy), I presume chiefly for serialization/deserialization, even
though Kryo was enabled. But the machine has 16 cores (VCPUs), so the most
immediate limiting factor must have been not CPU saturation but IO latency
(unless there is some obscure setting limiting the number of
reading/writing threads). The latency arising, fundamentally, out of very
large number of tiny files.

Is there a way to control the size of checkpoint "delta" and "snapshot"
files Spark creates, to make them larger?
And the same also for the files in Spark local directory?

* * *

The numbers for checkpoint directory are, of course, captured when it was
set to a local drive (or Lustre/NFS.).

For HDFS there are obviously no local file system calls for the checkpoint
store, as HDFS does not present itself as an OS-level file system.
Nevertheless the name of checkpoint directory was transmitted over HDFS
connection socket 1,675 times per microbatch, so the number of high-level
HDFS file operations must have been at least that high.

* * *

On a related note, for 920,000 events Spark made 700,000 attempts to
execute chmod or readlink program, i.e. to launch an external subprocess
with an executable in order to perform a file operation. Those 900,000
attempts actually represent 150,000 cycles, and in each cycle Spark tried
to launch the program from 6 different locations (/usr/local/sbin ->
/usr/local/bin -> /usr/sbin -> /usr/bin -> /sbin -> /bin),  until it
finally finds it in the last. But then on the next cycle Spark/Hadoop does
not re-use the knowledge of a previously found utility location, and
repeats the search from the very start causing useless file system search
operations over and over again.

This may or may not matter when HDFS is used for checkpoint store
(depending on how HDFS server implements the calls), but it does matter
when a file system like Lustre or NFS is used for checkpoint storage.
(Not to mention spawning readlink and chmod does not seem like a bright
idea in the first place, although perhaps there might be a reason why
Hadoop layer does it this way).

Thanks,
Sergey

On Mon, Oct 5, 2020 at 5:45 AM Jungtaek Lim 
wrote:

> First of all, you'd want to divide these numbers by the number of
> micro-batches, as file creations in checkpoint directory would occur
> similarly per micro-batch.
> Second, you'd want to dive inside the checkpoint directory and have
> separate numbers per top-subdirectory.
>
> After that we can see whether the value would make sense or not.
>
> Regarding file I/O issues on SS, two issues I know about are:
> 1) If you use streaming aggregation, it unnecessarily creates a temporary
> file for both read and write on the state store, while the file is only
> needed for writing. That makes the number of file creations to be 2x. The
> patch is proposed under SPARK-30294 [1].
>
> 2) Spark leverages HDFS API which is configured to create crc file per
> file by default. (So you'll have 2x files than expected.) There's a bug in
> HDFS API (HADOOP-16255 [2]) which missed to handle crc files during rename
> (in short of how 

Excessive disk IO with Spark structured streaming

2020-10-04 Thread Sergey Oboguev
 I am trying to run a Spark structured streaming program simulating basic
scenario of ingesting events and calculating aggregates on a window with
watermark, and I am observing an inordinate amount of disk IO Spark
performs.

The basic structure of the program is like this:

sparkSession = SparkSession.builder()
   .appName()
   .master("local[*]")
   .config("spark.executor.memory", "8g")
   .config("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
   .config("spark.kryoserializer.buffer", "8m")
   .config("spark.local.dir", ...local directory...)
   .getOrCreate();

sparkSession.sparkContext().setCheckpointDir(... checkpoint dir for the app
...);

dataset = sparkSession.readStream()
  .option("checkpointLocation", ... checkpoint dir for
source ...)
  .format(MockStreamingSource.class.getName())
  .load();

Dataset ds = dataset
  .withWatermark("timestamp", "10 minutes")
  .groupBy(
  functions.window(functions.col("timestamp"),
"2 minutes"),
  functions.col("source"))
  .agg(
  functions.avg("D0").as("AVG_D0"),
  functions.avg("I0").as("AVG_I0"));

DataStreamWriter dsw = ds.writeStream()
  // .trigger(Trigger.ProcessingTime("1
minute"))
  .option("checkpointLocation", .. checkpoint
dir for writer ... );

dsw.outputMode(OutputMode.Append())
   .format("console")
   .option("truncate", "false")
   .option("numRows", Integer.MAX_VALUE)
   .start()
   .awaitTermination();


MockStreamingSource is just that -- a source intended to provide a
simulated input. It generates microbatches of mock events and sends them to
the app. In the testing scenario, the source simulates 20,000 devices each
sending an event every 15 seconds for 11.5 minutes of logical time (just
under 12 minutes of window size + watermark), for a total number of 920,000
events.

I initially started with microbatch sized to 500 events, and processing
performance was totally dismal because of disk IO. I then increased
microbatch size and performance got better, but still very poor. Microbatch
size now is 13,334 events per batch, this corresponds to ingestion interval
of 10 seconds. Smaller batches resulted in worse performance.

But even with microbatch sized 13,334 event performance is poor because of
excessive disk IO generated by Spark.
Just ingesting data generated intra-app takes the program physical time
equal to 40% of window size + watermark.

Using strace, I measured that checkpoint directory for the stream writer
receives the following number of Linux system calls:

create/open file = 60,500 calls
mkdir = 57,000
readlink = 59,000
unlink = 41,900
rename = 14,700
execve readlink=353,000 (incl. repetitive searches of readlink executable
in 6 different locations)
execve chmod=340,620 (incl. repetitive searches of chmod executable in 6
different locations)

In addition, Spark local directory received:

create/open file = 55,000 calls
unlink = 13,800
stat = 42,000

That's for mere 920,000 of small events (each event Row is 600 bytes when
in Java heap).

I also tried trigger(...) to see whether it can improve anything, but it
just made things worse.

Spark version 2.4.6.

Is this an expected amount of disk IO for Spark, or am I doing something
wrong and there is a way to avoid Spark generating such an amount of disk
IO?


Re: Spark watermarked aggregation query and append output mode

2020-09-23 Thread Sergey Oboguev
Thanks!

It appears one should use not *dataset.col("timestamp")*
but rather* functions.col("timestamp").*


Spark watermarked aggregation query and append output mode

2020-09-23 Thread Sergey Oboguev
Hi,

I am trying to aggregate Spark time-stamped structured stream to get
per-device (source) averages for every second of incoming data.

dataset.printSchema();   // see the output below

Dataset ds1 = dataset
  .withWatermark("timestamp", "1 second")
  .groupBy(

functions.window(dataset.col("timestamp"), "1 second", "1 second"),
   dataset.col("source"))
  .agg(
   functions.avg("D0").as("AVG_D0"),
   functions.avg("I0").as("AVG_I0"))
  .orderBy("window");

StreamingQuery query = ds1.writeStream()
  .outputMode(OutputMode.Append())
  .format("console")
  .option("truncate", "false")
  .option("numRows", Integer.MAX_VALUE)
  .start();

query.awaitTermination();

I am using Spark 2.4.6.

According to
https://spark.apache.org/docs/2.4.6/structured-streaming-programming-guide.html#output-modes
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes
the above construct should work fine.

Yet I am getting an exception in the query start():

11:05:27.282 [main] ERROR my.sparkbench.example.Example - Exception
org.apache.spark.sql.AnalysisException: *Append output mode not
supported when there are streaming aggregations on streaming
DataFrames/DataSets without watermark*;;
Sort [window#44 ASC NULLS FIRST], true
+- Aggregate [window#71, source#0], [window#71 AS window#44, source#0,
avg(D0#12) AS AVG_D0#68, avg(I0#2L) AS AVG_I0#70]
   +- Filter isnotnull(timestamp#1)
  +- Project [named_struct(start,
precisetimestampconversion(CASE WHEN
(cast(CEIL((cast((precisetimestampconversion(timestamp#1,
TimestampType, LongType) - 0) as double) / cast(100 as double)))
as double) = (cast((precisetimestampconversion(timestamp#1,
TimestampType, LongType) - 0) as double) / cast(100 as double)))
THEN (CEIL((cast((precisetimestampconversion(timestamp#1,
TimestampType, LongType) - 0) as double) / cast(100 as double))) +
cast(1 as bigint)) ELSE
CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType,
LongType) - 0) as double) / cast(100 as double))) END + cast(0 as
bigint)) - cast(1 as bigint)) * 100) + 0), LongType,
TimestampType), end, precisetimestampconversion((CASE WHEN
(cast(CEIL((cast((precisetimestampconversion(timestamp#1,
TimestampType, LongType) - 0) as double) / cast(100 as double)))
as double) = (cast((precisetimestampconversion(timestamp#1,
TimestampType, LongType) - 0) as double) / cast(100 as double)))
THEN (CEIL((cast((precisetimestampconversion(timestamp#1,
TimestampType, LongType) - 0) as double) / cast(100 as double))) +
cast(1 as bigint)) ELSE
CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType,
LongType) - 0) as double) / cast(100 as double))) END + cast(0 as
bigint)) - cast(1 as bigint)) * 100) + 0) + 100), LongType,
TimestampType)) AS window#71, source#0, timestamp#1-T1000ms, I0#2L,
I1#3L, I2#4L, I3#5L, I4#6L, I5#7L, I6#8L, I7#9L, I8#10L, I9#11L,
D0#12, D1#13, D2#14, D3#15, D4#16, D5#17, D6#18, D7#19, D8#20, D9#21]
 +- EventTimeWatermark timestamp#1: timestamp, interval 1 seconds
+- StreamingRelationV2
my.sparkbench.datastreamreader.MyStreamingSource@6897a4a,
my.sparkbench.datastreamreader.MyStreamingSource, [source#0,
timestamp#1, I0#2L, I1#3L, I2#4L, I3#5L, I4#6L, I5#7L, I6#8L, I7#9L,
I8#10L, I9#11L, D0#12, D1#13, D2#14, D3#15, D4#16, D5#17, D6#18,
D7#19, D8#20, D9#21]

at 
org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:389)
at 
org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:111)
at 
org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:256)
at 
org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:322)
at 
org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:325)
at my.sparkbench.example.Example.streamGroupByResult(Example.java:113)
at my.sparkbench.example.Example.exec_main(Example.java:76)
at my.sparkbench.example.Example.do_main(Example.java:42)
at my.sparkbench.example.Example.main(Example.java:34)

even though there is a watermark on the stream.

Schema printout looks fine:

root
 |-- source: string (nullable = false)
 |-- timestamp: timestamp (nullable = false)
 |-- I0: long (nullable = false)
 |-- I1: long (nullable = false)
 |-- I2: long (nullable = false)
 |-- I3: long (nullable = false)
 |-- I4: long (nullable = false)
 |-- I5: long (nullable = false)
 |-- I6: long (nullable = false)
 |-- I7: long (nullable = false)
 |-- I8: 

Re: Dynamic metric names

2019-05-06 Thread Sergey Zhemzhitsky
Hi Saisai,

Thanks a lot for the link! This is exactly what I need.
Just curious, why this PR has not been merged, as it seems to implement
rather natural requirement.

There are a number or use cases which can benefit from this feature, e.g.
- collecting business metrics based on the data's attributes and reporting
them into the monitoring system as a side effect of the data processing
- visualizing technical metrics by means of alternative software (e.g.
grafana) - currently it's hardly possible to know the actual number of
jobs, stages, tasks and their names and IDs in advance to register all the
corresponding metrics statically.


Kind Regards,
Sergey


On Mon, May 6, 2019, 16:07 Saisai Shao  wrote:

> I remembered there was a PR about doing similar thing (
> https://github.com/apache/spark/pull/18406). From my understanding, this
> seems like a quite specific requirement, it may requires code change to
> support your needs.
>
> Thanks
> Saisai
>
> Sergey Zhemzhitsky  于2019年5月4日周六 下午4:44写道:
>
>> Hello Spark Users!
>>
>> Just wondering whether it is possible to register a metric source without
>> metrics known in advance and add the metrics themselves to this source
>> later on?
>>
>> It seems that currently MetricSystem puts all the metrics from the
>> source's MetricRegistry into a shared MetricRegistry of a MetricSystem
>> during metric source registration [1].
>>
>> So in case there is a new metric with a new name added to the source's
>> registry after this source registration, then this new metric will not be
>> reported to the sinks.
>>
>> What I'd like to achieve is to be able to register new metrics with new
>> names dynamically using a single metric source.
>> Is it somehow possible?
>>
>>
>> [1]
>> https://github.com/apache/spark/blob/51de86baed0776304c6184f2c04b6303ef48df90/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala#L162
>>
>


Dynamic metric names

2019-05-04 Thread Sergey Zhemzhitsky
Hello Spark Users!

Just wondering whether it is possible to register a metric source without
metrics known in advance and add the metrics themselves to this source
later on?

It seems that currently MetricSystem puts all the metrics from the source's
MetricRegistry into a shared MetricRegistry of a MetricSystem during metric
source registration [1].

So in case there is a new metric with a new name added to the source's
registry after this source registration, then this new metric will not be
reported to the sinks.

What I'd like to achieve is to be able to register new metrics with new
names dynamically using a single metric source.
Is it somehow possible?


[1]
https://github.com/apache/spark/blob/51de86baed0776304c6184f2c04b6303ef48df90/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala#L162


Re: FW: Spark2 and Hive metastore

2018-11-12 Thread Sergey B.
In order for the Spark to see Hive metastore you need to build Spark
Session accordingly:

val spark = SparkSession.builder()
  .master("local[2]")
  .appName("myApp")
  .config("hive.metastore.uris","thrift://localhost:9083")
  .enableHiveSupport()
  .getOrCreate()

On Mon, Nov 12, 2018 at 11:49 AM Ирина Шершукова 
wrote:

>
>
> hello guys,  spark2.1.0 couldn’t connect to existing Hive metastore.
>
>
>
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Re: Accumulator guarantees

2018-05-10 Thread Sergey Zhemzhitsky
As far as I understand updates of the custom accumulators at the
driver side happen during task completion [1].
The documentation states [2] that the very last stage in a job
consists of multiple ResultTasks, which execute the task and send its
output back to the driver application.
Also sources prove [3] that accumulators are updated for the
ResultTasks just once.

So it's seems that accumulators are safe to use and will be executed
only once regardless of where they are used (transformations as well
as actions) in case these transformations and actions are belong to
the last stage. Is it correct?
Could anyone of Spark commiters or contributors please confirm it?


[1] 
https://github.com/apache/spark/blob/3990daaf3b6ca2c5a9f7790030096262efb12cb2/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1194
[2] 
https://github.com/apache/spark/blob/a6fc300e91273230e7134ac6db95ccb4436c6f8f/core/src/main/scala/org/apache/spark/scheduler/Task.scala#L36
[3] 
https://github.com/apache/spark/blob/3990daaf3b6ca2c5a9f7790030096262efb12cb2/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1204

On Thu, May 10, 2018 at 10:24 PM, Sergey Zhemzhitsky  wrote:
> Hi there,
>
> Although Spark's docs state that there is a guarantee that
> - accumulators in actions will only be updated once
> - accumulators in transformations may be updated multiple times
>
> ... I'm wondering whether the same is true for transformations in the
> last stage of the job or there is a guarantee that accumulators
> running in transformations of the last stage are guaranteed to be
> updated once too?

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Accumulator guarantees

2018-05-10 Thread Sergey Zhemzhitsky
Hi there,

Although Spark's docs state that there is a guarantee that
- accumulators in actions will only be updated once
- accumulators in transformations may be updated multiple times

... I'm wondering whether the same is true for transformations in the
last stage of the job or there is a guarantee that accumulators
running in transformations of the last stage are guaranteed to be
updated once too?

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: AccumulatorV2 vs AccumulableParam (V1)

2018-05-04 Thread Sergey Zhemzhitsky
Hi Wenchen,

Thanks a lot for clarification and help.

Here is what I mean regarding the remaining points

For 2: Should we update the documentation [1] regarding custom
accumulators to be more clear and to highlight that
  a) custom accumulators should always override "copy" method to
prevent unexpected behaviour with losing type information
  b) custom accumulators cannot be direct anonymous subclasses of
AccumulatorV2 because of a)
  c) extending already existing accumulators almost always requires
overriding "copy" because of a)

For 3: Here is [2] the sample that shows that the same
AccumulableParam can be registered twice with different names.
Here is [3] the sample that fails with IllegalStateException on this
line [4] because accumulator's metadata is not null and it's hardly
possible to reset it to null (there is no public API for such a
thing).
I understand, that Spark creates different Accumulators for the same
AccumulableParam internally and because of AccumulatorV2 is stateful
using the same stateful accumulator instance in multiple places for
different things is very dangerous, so maybe we should highlight this
point in the documentation too?

For 5: Should we raise a JIRA for that?


[1] https://spark.apache.org/docs/latest/rdd-programming-guide.html#accumulators
[2] 
https://gist.github.com/szhem/52a26ada4bbeb1a3e762710adc3f94ef#file-accumulatorsspec-scala-L36
[3] 
https://gist.github.com/szhem/52a26ada4bbeb1a3e762710adc3f94ef#file-accumulatorsspec-scala-L59
[4] 
https://github.com/apache/spark/blob/4d5de4d303a773b1c18c350072344bd7efca9fc4/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L51


Kind Regards,
Sergey

On Thu, May 3, 2018 at 5:20 PM, Wenchen Fan  wrote:
> Hi Sergey,
>
> Thanks for your valuable feedback!
>
> For 1: yea this is definitely a bug and I have sent a PR to fix it.
> For 2: I have left my comments on the JIRA ticket.
> For 3: I don't quite understand it, can you give some concrete examples?
> For 4: yea this is a problem, but I think it's not a big deal, and we
> couldn't find a better solution at that time.
> For 5: I think this is a real problem. It looks to me that we can merge
> `isZero`, `copyAndReset`, `copy`, `reset` into one API: `zero`, which is
> basically just the `copyAndReset`. If there is a way to fix this without
> breaking the existing API, I'm really happy to do it.
> For 6: same as 4. It's a problem but not a big deal.
>
> In general, I think accumulator v2 sacrifices some flexibility to simplify
> the framework and improve the performance. Users can still use accumulator
> v1 if flexibility is more important to them. We can keep improving
> accumulator v2 without breaking backward compatibility.
>
> Thanks,
> Wenchen
>
> On Thu, May 3, 2018 at 6:20 AM, Sergey Zhemzhitsky 
> wrote:
>>
>> Hello guys,
>>
>> I've started to migrate my Spark jobs which use Accumulators V1 to
>> AccumulatorV2 and faced with the following issues:
>>
>> 1. LegacyAccumulatorWrapper now requires the resulting type of
>> AccumulableParam to implement equals. In other case the
>> AccumulableParam, automatically wrapped into LegacyAccumulatorWrapper,
>> will fail with AssertionError (SPARK-23697 [1]).
>>
>> 2. Existing AccumulatorV2 classes are hardly difficult to extend
>> easily and correctly (SPARK-24154 [2]) due to its "copy" method which
>> is called during serialization and usually loses type information of
>> descendant classes which don't override "copy" (and it's easier to
>> implement an accumulator from scratch than override it correctly)
>>
>> 3. The same instance of AccumulatorV2 cannot be used with the same
>> SparkContext multiple times (unlike AccumulableParam) failing with
>> "IllegalStateException: Cannot register an Accumulator twice" even
>> after "reset" method called. So it's impossible to unregister already
>> registered accumulator from user code.
>>
>> 4. AccumulableParam (V1) implementations are usually more or less
>> stateless, while AccumulatorV2 implementations are almost always
>> stateful, leading to (unnecessary?) type checks (unlike
>> AccumulableParam). For example typical "merge" method of AccumulatorV2
>> requires to check whether current accumulator is of an appropriate
>> type, like here [3]
>>
>> 5. AccumulatorV2 is more difficult to implement correctly unlike
>> AccumulableParam. For example, in case of AccumulableParam I have to
>> implement just 3 methods (addAccumulator, addInPlace, zero), in case
>> of AccumulableParam - just 2 methods (addInPlace, zero) and in case of
>> AccumulatorV2 - 6 methods (isZero, copy, reset

AccumulatorV2 vs AccumulableParam (V1)

2018-05-02 Thread Sergey Zhemzhitsky
Hello guys,

I've started to migrate my Spark jobs which use Accumulators V1 to
AccumulatorV2 and faced with the following issues:

1. LegacyAccumulatorWrapper now requires the resulting type of
AccumulableParam to implement equals. In other case the
AccumulableParam, automatically wrapped into LegacyAccumulatorWrapper,
will fail with AssertionError (SPARK-23697 [1]).

2. Existing AccumulatorV2 classes are hardly difficult to extend
easily and correctly (SPARK-24154 [2]) due to its "copy" method which
is called during serialization and usually loses type information of
descendant classes which don't override "copy" (and it's easier to
implement an accumulator from scratch than override it correctly)

3. The same instance of AccumulatorV2 cannot be used with the same
SparkContext multiple times (unlike AccumulableParam) failing with
"IllegalStateException: Cannot register an Accumulator twice" even
after "reset" method called. So it's impossible to unregister already
registered accumulator from user code.

4. AccumulableParam (V1) implementations are usually more or less
stateless, while AccumulatorV2 implementations are almost always
stateful, leading to (unnecessary?) type checks (unlike
AccumulableParam). For example typical "merge" method of AccumulatorV2
requires to check whether current accumulator is of an appropriate
type, like here [3]

5. AccumulatorV2 is more difficult to implement correctly unlike
AccumulableParam. For example, in case of AccumulableParam I have to
implement just 3 methods (addAccumulator, addInPlace, zero), in case
of AccumulableParam - just 2 methods (addInPlace, zero) and in case of
AccumulatorV2 - 6 methods (isZero, copy, reset, add, merge, value)

6. AccumulatorV2 classes are hardly possible to be anonymous classes,
because of their "copy" and "merge" methods which typically require a
concrete class to make a type check.

I understand the motivation for AccumulatorV2 (SPARK-14654 [4]), but
just wondering whether there is a way to simplify the API of
AccumulatorV2 to meet the points described above and to be less error
prone?


[1] https://issues.apache.org/jira/browse/SPARK-23697
[2] https://issues.apache.org/jira/browse/SPARK-24154
[3] 
https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea1071652293981604/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala#L348
[4] https://issues.apache.org/jira/browse/SPARK-14654

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: DataFrames :: Corrupted Data

2018-03-28 Thread Sergey Zhemzhitsky
I suppose that it's hardly possible that this issue is connected with
the string encoding, because

- "pr^?files.10056.10040" should be "profiles.10056.10040" and is
defined as constant in the source code
- 
"profiles.total^@^@f2-a733-9304fda722ac^@^@^@^@profiles.10361.10005^@^@^@^@.total^@^@0075^@^@^@^@"
should no occur in exception at all, because such a strings are not
created within the job
- the strings being corrupted are defined within the job and there are
no such input data
- when yarn restarts the job for the second time after the first
failure, the job completes successfully




On Wed, Mar 28, 2018 at 10:31 PM, Jörn Franke  wrote:
> Encoding issue of the data? Eg spark uses utf-8 , but source encoding is 
> different?
>
>> On 28. Mar 2018, at 20:25, Sergey Zhemzhitsky  wrote:
>>
>> Hello guys,
>>
>> I'm using Spark 2.2.0 and from time to time my job fails printing into
>> the log the following errors
>>
>> scala.MatchError:
>> profiles.total^@^@f2-a733-9304fda722ac^@^@^@^@profiles.10361.10005^@^@^@^@.total^@^@0075^@^@^@^@
>> scala.MatchError: pr^?files.10056.10040 (of class java.lang.String)
>> scala.MatchError: pr^?files.10056.10040 (of class java.lang.String)
>> scala.MatchError: pr^?files.10056.10040 (of class java.lang.String)
>> scala.MatchError: pr^?files.10056.10040 (of class java.lang.String)
>>
>> The job itself looks like the following and contains a few shuffles and UDAFs
>>
>> val df = spark.read.avro(...).as[...]
>>  .groupBy(...)
>>  .agg(collect_list(...).as(...))
>>  .select(explode(...).as(...))
>>  .groupBy(...)
>>  .agg(sum(...).as(...))
>>  .groupBy(...)
>>  .agg(collectMetrics(...).as(...))
>>
>> The errors occur in the collectMetrics UDAF in the following snippet
>>
>> key match {
>>  case "profiles.total" => updateMetrics(...)
>>  case "profiles.biz" => updateMetrics(...)
>>  case ProfileAttrsRegex(...) => updateMetrics(...)
>> }
>>
>> ... and I'm absolutely ok with scala.MatchError because there is no
>> "catch all" case in the pattern matching expression, but the strings
>> containing corrupted characters seem to be very strange.
>>
>> I've found the following jira issues, but it's hardly difficult to say
>> whether they are related to my case:
>> - https://issues.apache.org/jira/browse/SPARK-22092
>> - https://issues.apache.org/jira/browse/SPARK-23512
>>
>> So I'm wondering, has anybody ever seen such kind of behaviour and
>> what could be the problem?
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



DataFrames :: Corrupted Data

2018-03-28 Thread Sergey Zhemzhitsky
Hello guys,

I'm using Spark 2.2.0 and from time to time my job fails printing into
the log the following errors

scala.MatchError:
profiles.total^@^@f2-a733-9304fda722ac^@^@^@^@profiles.10361.10005^@^@^@^@.total^@^@0075^@^@^@^@
scala.MatchError: pr^?files.10056.10040 (of class java.lang.String)
scala.MatchError: pr^?files.10056.10040 (of class java.lang.String)
scala.MatchError: pr^?files.10056.10040 (of class java.lang.String)
scala.MatchError: pr^?files.10056.10040 (of class java.lang.String)

The job itself looks like the following and contains a few shuffles and UDAFs

val df = spark.read.avro(...).as[...]
  .groupBy(...)
  .agg(collect_list(...).as(...))
  .select(explode(...).as(...))
  .groupBy(...)
  .agg(sum(...).as(...))
  .groupBy(...)
  .agg(collectMetrics(...).as(...))

The errors occur in the collectMetrics UDAF in the following snippet

key match {
  case "profiles.total" => updateMetrics(...)
  case "profiles.biz" => updateMetrics(...)
  case ProfileAttrsRegex(...) => updateMetrics(...)
}

... and I'm absolutely ok with scala.MatchError because there is no
"catch all" case in the pattern matching expression, but the strings
containing corrupted characters seem to be very strange.

I've found the following jira issues, but it's hardly difficult to say
whether they are related to my case:
- https://issues.apache.org/jira/browse/SPARK-22092
- https://issues.apache.org/jira/browse/SPARK-23512

So I'm wondering, has anybody ever seen such kind of behaviour and
what could be the problem?

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Best way of shipping self-contained pyspark jobs with 3rd-party dependencies

2017-12-08 Thread Sergey Zhemzhitsky
Hi PySparkers,

What currently is the best way of shipping self-contained pyspark jobs
with 3rd-party dependencies?
There are some open JIRA issues [1], [2] as well as corresponding PRs
[3], [4] and articles [5], [6], [7] regarding setting up the python
environment with conda and virtualenv respectively, and I believe [7]
is misleading article, because of unsupported spark options, like
spark.pyspark.virtualenv.enabled,
spark.pyspark.virtualenv.requirements, etc.

So I'm wondering what the community does in cases, when it's necessary to
- prevent python package/module version conflicts between different jobs
- prevent updating all the nodes of the cluster in case of new job dependencies
- track which dependencies are introduced on the per-job basis


[1] https://issues.apache.org/jira/browse/SPARK-13587
[2] https://issues.apache.org/jira/browse/SPARK-16367
[3] https://github.com/apache/spark/pull/13599
[4] https://github.com/apache/spark/pull/14180
[5] https://www.anaconda.com/blog/developer-blog/conda-spark
[6] http://henning.kropponline.de/2016/09/17/running-pyspark-with-virtualenv
[7] 
https://community.hortonworks.com/articles/104947/using-virtualenv-with-pyspark.html

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Best way of shipping self-contained pyspark jobs with 3rd-party dependencies

2017-12-07 Thread Sergey Zhemzhitsky
Hi PySparkers,

What currently is the best way of shipping self-contained pyspark jobs with
3rd-party dependencies?
There are some open JIRA issues [1], [2] as well as corresponding PRs [3],
[4] and articles [5], [6], regarding setting up the python environment with
conda and virtualenv respectively.

So I'm wondering what the community does in cases, when it's necessary to
- prevent python package/module version conflicts between different jobs
- prevent updating all the nodes of the cluster in case of new job
dependencies
- track which dependencies are introduced on the per-job basis


[1] https://issues.apache.org/jira/browse/SPARK-13587
[2] https://issues.apache.org/jira/browse/SPARK-16367
[3] https://github.com/apache/spark/pull/13599
[4] https://github.com/apache/spark/pull/14180
[5] https://www.anaconda.com/blog/developer-blog/conda-spark/
[6]
http://henning.kropponline.de/2016/09/17/running-pyspark-with-virtualenv/


What is the purpose of having RDD.context and RDD.sparkContext at the same time?

2017-06-27 Thread Sergey Zhemzhitsky
Hello spark gurus,

Could you please shed some light on what is the purpose of having two
identical functions in RDD,
RDD.context [1] and RDD.sparkContext [2].

RDD.context seems to be used more frequently across the source code.

[1]
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1693
[2]
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L146

Kind Regards,
Sergey


Re: Is GraphX really deprecated?

2017-05-15 Thread Sergey Zhemzhitsky
GraphFrames seems promising but it still has a lot of algorithms, which involve
in one way or another GraphX, or run on top of GraphX according to github
repo (
https://github.com/graphframes/graphframes/tree/master/src/main/scala/org/graphframes/lib),
and in case of RDDs and semistructured data it's not really necessary to
include another library that just will delegate to GraphX, which is still
shipped with Spark as the default graph-processing module.

Also doesn't Pregel-like programming abstraction of GraphX (although it is
on top of RDD joins) seem to be more natural than a number of join steps of
GraphFrames? I believe such an abstraction wouldn't hurt GraphFrames too.



On May 14, 2017 19:07, "Jules Damji"  wrote:

GraphFrames is not part of Spark Core as is Structured Streaming; it's
still open-source and part of Spark packages. But I anticipate that as it
becomes more at parity with all GraphX in algorithms & functionality, it's
not unreasonable to anticipate its inevitable wide adoption and preference.

To get a flavor have a go at it https://databricks.com/blog
/2016/03/03/introducing-graphframes.html

Cheers
Jules

Sent from my iPhone
Pardon the dumb thumb typos :)

On May 13, 2017, at 2:01 PM, Jacek Laskowski  wrote:

Hi,

I'd like to hear the official statement too.

My take on GraphX and Spark Streaming is that they are long dead projects
with GraphFrames and Structured Streaming taking their place, respectively.

Jacek

On 13 May 2017 3:00 p.m., "Sergey Zhemzhitsky"  wrote:

> Hello Spark users,
>
> I just would like to know whether the GraphX component should be
> considered deprecated and no longer actively maintained
> and should not be considered when starting new graph-processing projects
> on top of Spark in favour of other
> graph-processing frameworks?
>
> I'm asking because
>
> 1. According to some discussions in GitHub pull requests, there are
> thoughts that GraphX is not under active development and
> can probably be deprecated soon.
>
> https://github.com/apache/spark/pull/15125
>
> 2. According to Jira activities GraphX component seems to be not very
> active and quite a lot of improvements are
> resolved as "Won't fix" event with pull requests provided.
>
> https://issues.apache.org/jira/issues/?jql=project%20%3D%20S
> PARK%20AND%20component%20%3D%20GraphX%20AND%20resolution%
> 20in%20(%22Unresolved%22%2C%20%22Won%27t%20Fix%22%2C%20%22Wo
> n%27t%20Do%22%2C%20Later%2C%20%22Not%20A%20Bug%22%2C%20%
> 22Not%20A%20Problem%22)%20ORDER%20BY%20created%20DESC
>
> So, I'm wondering what the community who uses GraphX, and commiters who
> develop it think regarding this Spark component?
>
> Kind regards,
> Sergey
>
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Is GraphX really deprecated?

2017-05-13 Thread Sergey Zhemzhitsky
Hello Spark users,

I just would like to know whether the GraphX component should be considered 
deprecated and no longer actively maintained
and should not be considered when starting new graph-processing projects on top 
of Spark in favour of other
graph-processing frameworks?

I'm asking because

1. According to some discussions in GitHub pull requests, there are thoughts 
that GraphX is not under active development and
can probably be deprecated soon.

https://github.com/apache/spark/pull/15125

2. According to Jira activities GraphX component seems to be not very active 
and quite a lot of improvements are
resolved as "Won't fix" event with pull requests provided.

https://issues.apache.org/jira/issues/?jql=project%20%3D%20SPARK%20AND%20component%20%3D%20GraphX%20AND%20resolution%20in%20(%22Unresolved%22%2C%20%22Won%27t%20Fix%22%2C%20%22Won%27t%20Do%22%2C%20Later%2C%20%22Not%20A%20Bug%22%2C%20%22Not%20A%20Problem%22)%20ORDER%20BY%20created%20DESC

So, I'm wondering what the community who uses GraphX, and commiters who develop 
it think regarding this Spark component?

Kind regards,
Sergey



-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark API authentication

2017-04-14 Thread Sergey Grigorev

Thanks for help!

I've found the ticket with a similar problem 
https://issues.apache.org/jira/browse/SPARK-19652. It looks like this 
fix did not hit to 2.1.0 release.
You said that for the second example custom filter is not supported. It 
is a bug or expected behavior?


On 14.04.2017 13:22, Saisai Shao wrote:
AFAIK, For the first line, custom filter should be worked. But for the 
latter it is not supported.


On Fri, Apr 14, 2017 at 6:17 PM, Sergey Grigorev 
mailto:grigorev-...@yandex.ru>> wrote:


GET requests like *http://worker:4040/api/v1/applications
<http://worker:4040/api/v1/applications> *or
*http://master:6066/v1/submissions/status/driver-20170414025324-
<http://master:6066/v1/submissions/status/driver-20170414025324->
*return successful result. But if I open the spark master web ui
then it requests username and password.


On 14.04.2017 12:46, Saisai Shao wrote:

Hi,

What specifically are you referring to "Spark API endpoint"?

Filter can only be worked with Spark Live and History web UI.

On Fri, Apr 14, 2017 at 5:18 PM, Sergey mailto:grigorev-...@yandex.ru>> wrote:

Hello all,

I've added own spark.ui.filters to enable basic
authentication to access to
Spark web UI. It works fine, but I still can do requests to
spark API
without any authentication.
Is there any way to enable authentication for API endpoints?

P.S. spark version is 2.1.0, deploy mode is standalone.



--
View this message in context:

http://apache-spark-user-list.1001560.n3.nabble.com/Spark-API-authentication-tp28601.html

<http://apache-spark-user-list.1001560.n3.nabble.com/Spark-API-authentication-tp28601.html>
Sent from the Apache Spark User List mailing list archive at
Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org
<mailto:user-unsubscr...@spark.apache.org>









Re: Spark API authentication

2017-04-14 Thread Sergey Grigorev
GET requests like *http://worker:4040/api/v1/applications *or 
*http://master:6066/v1/submissions/status/driver-20170414025324- 
*return successful result. But if I open the spark master web ui then it 
requests username and password.


On 14.04.2017 12:46, Saisai Shao wrote:

Hi,

What specifically are you referring to "Spark API endpoint"?

Filter can only be worked with Spark Live and History web UI.

On Fri, Apr 14, 2017 at 5:18 PM, Sergey <mailto:grigorev-...@yandex.ru>> wrote:


Hello all,

I've added own spark.ui.filters to enable basic authentication to
access to
Spark web UI. It works fine, but I still can do requests to spark API
without any authentication.
Is there any way to enable authentication for API endpoints?

P.S. spark version is 2.1.0, deploy mode is standalone.



--
View this message in context:

http://apache-spark-user-list.1001560.n3.nabble.com/Spark-API-authentication-tp28601.html

<http://apache-spark-user-list.1001560.n3.nabble.com/Spark-API-authentication-tp28601.html>
Sent from the Apache Spark User List mailing list archive at
Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org
<mailto:user-unsubscr...@spark.apache.org>






Spark API authentication

2017-04-14 Thread Sergey
Hello all,

I've added own spark.ui.filters to enable basic authentication to access to
Spark web UI. It works fine, but I still can do requests to spark API
without any authentication.
Is there any way to enable authentication for API endpoints?

P.S. spark version is 2.1.0, deploy mode is standalone.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-API-authentication-tp28601.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: apache-spark doesn't work correktly with russian alphabet

2017-01-18 Thread Sergey B.
​Try to make encoding right.
E.g,, if you read from `csv` or other sources, specify encoding, which is
most probably `cp1251`:

df = sqlContext.read.csv(filePath, encoding="cp1251")

On Linux cli encoding can be found with `chardet` utility​

On Wed, Jan 18, 2017 at 3:53 PM, AlexModestov 
wrote:

> I want to use Apache Spark for working with text data. There are some
> Russian
> symbols but Apache Spark shows me strings which look like as
> "...\u0413\u041e\u0420\u041e...". What should I do for correcting them.
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/apache-spark-doesn-t-work-correktly-
> with-russian-alphabet-tp28316.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Adding Hive support to existing SparkSession (or starting PySpark with Hive support)

2016-12-19 Thread Sergey B.
I have a asked a similar question here

http://stackoverflow.com/questions/40701518/spark-2-0-redefining-sparksession-params-through-getorcreate-and-not-seeing-cha

Please see the answer, basically stating that it's impossible to change
Session config as soon as it was initiated

On Mon, Dec 19, 2016 at 9:01 PM, Venkata Naidu  wrote:

> We can create a link in the spark conf directory to point hive.conf file
> of hive installation I believe.
>
> Thanks,
> Venkat.
>
> On Mon, Dec 19, 2016, 10:58 AM apu  wrote:
>
>> This is for Spark 2.0:
>>
>> If I wanted Hive support on a new SparkSession, I would build it with:
>>
>> spark = SparkSession \
>> .builder \
>> .enableHiveSupport() \
>> .getOrCreate()
>>
>> However, PySpark already creates a SparkSession for me, which appears to
>> lack HiveSupport. How can I either:
>>
>> (a) Add Hive support to an existing SparkSession,
>>
>> or
>>
>> (b) Configure PySpark so that the SparkSession it creates at startup has
>> Hive support enabled?
>>
>> Thanks!
>>
>> Apu
>>
>


MapWithState would not restore from checkpoint.

2016-06-27 Thread Sergey Zelvenskiy
MapWithState would not restore from checkpoint. MapRDD code requires non
empty spark contexts, while the context is empty.


ERROR 2016-06-27 11:06:33,236 0 org.apache.spark.streaming.StreamingContext
[run-main-0] Error starting the context, marking it as stopped
org.apache.spark.SparkException: RDD transformations and actions can only
be invoked by the driver, not inside of other transformations; for example,
rdd1.map(x => rdd2.values.count() * x) is invalid because the values
transformation and count action cannot be performed inside of the rdd1.map
transformation. For more information, see SPARK-5063.
at org.apache.spark.rdd.RDD.org 
$apache$spark$rdd$RDD$$sc(RDD.scala:87)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at
org.apache.spark.rdd.PairRDDFunctions.partitionBy(PairRDDFunctions.scala:530)
at
org.apache.spark.streaming.rdd.MapWithStateRDD$.createFromPairRDD(MapWithStateRDD.scala:189)
at
org.apache.spark.streaming.dstream.InternalMapWithStateDStream.compute(MapWithStateDStream.scala:146)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at scala.Option.orElse(Option.scala:257)
at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
at
org.apache.spark.streaming.dstream.MapWithStateDStreamImpl.compute(MapWithStateDStream.scala:65)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at scala.Option.orElse(Option.scala:257)
at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
at
org.apache.spark.streaming.dstream.FlatMappedDStream.compute(FlatMappedDStream.scala:35)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at scala.Option.orElse(Option.scala:257)
at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
at
org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at scala.Option.orElse(Option.scala:257)
at
org.apache.spark.streaming.dstre

Is there a way to run a jar built for scala 2.11 on spark 1.6.1 (which is using 2.10?)

2016-05-18 Thread Sergey Zelvenskiy



RDDs caching in typical machine learning use cases

2016-04-03 Thread Sergey
Hi Spark ML experts!

Do you use RDDs caching somewhere together with ML lib to speed up
calculation?
I mean typical machine learning use cases.
Train-test split, train, evaluate, apply model.

Sergey.


Random forest implementation details

2016-04-03 Thread Sergey
Hi!

I'm playing with random forest implementation in Apache Spark.
First impression is - it is not fast :-(

Does somebody know how random forest is parallelized in Spark?
I mean both fitting and predicting.

And also what do mean this parameters? Didn't find documentation for them.
maxMemoryInMB=256, cacheNodeIds=False, checkpointInterval=10

Sergey


SocketTimeoutException

2016-04-01 Thread Sergey
Hi!


I get SocketTimeoutException when execute piece of code first time.

When I re-run it - it works fine. The code just reads csv file and
transforms it to dataframe. Any ideas abot the reason?


import pyspark_csv as pycsv
plaintext_rdd = sc.textFile(r'file:///c:\data\sample.csv')
dataframe = pycsv.csvToDataFrame(sql, plaintext_rdd)


csv file size is 50MB.



I use Spark 1.6.1 on Windows locally.


Regards,

Sergey.




Py4JJavaError Traceback (most recent call
last) in ()  2
plaintext_rdd = sc.textFile(r'file:///c:\data\sample.csv')  3
#plaintext_rdd.count()> 4 dataframe = pycsv.csvToDataFrame(sql,
plaintext_rdd)
C:\Users\sergun\AppData\Local\Temp\spark-f4884acc-f8fd-416b-836b-2adebfd8e027\userFiles-02d6b0ee-569a-41ee-bd80-ccff2e09725e\pyspark_csv.py
in csvToDataFrame(sqlCtx, rdd, columns, sep, parseDate)
c:\spark\python\pyspark\rdd.py in first(self)   1313
ValueError: RDD is empty   1314 """-> 1315 rs =
self.take(1)   1316 if rs:   1317 return rs[0]
c:\spark\python\pyspark\rdd.py in take(self, num)   12951296
  p = range(partsScanned, min(partsScanned + numPartsToTry,
totalParts))-> 1297 res = self.context.runJob(self,
takeUpToNumLeft, p)   12981299 items += res
c:\spark\python\pyspark\context.py in runJob(self, rdd, partitionFunc,
partitions, allowLocal)937 # SparkContext#runJob.938
  mappedRDD = rdd.mapPartitions(partitionFunc)--> 939 port
= self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd,
partitions)940 return list(_load_from_socket(port,
mappedRDD._jrdd_deserializer))941
c:\spark\python\lib\py4j-0.9-src.zip\py4j\java_gateway.py in
__call__(self, *args)811 answer =
self.gateway_client.send_command(command)812 return_value
= get_return_value(--> 813 answer, self.gateway_client,
self.target_id, self.name)814 815 for temp_arg in
temp_args:
c:\spark\python\pyspark\sql\utils.py in deco(*a, **kw) 43 def
deco(*a, **kw): 44 try:---> 45 return f(*a,
**kw) 46 except py4j.protocol.Py4JJavaError as e: 47
  s = e.java_exception.toString()
c:\spark\python\lib\py4j-0.9-src.zip\py4j\protocol.py in
get_return_value(answer, gateway_client, target_id, name)306
  raise Py4JJavaError(307 "An error
occurred while calling {0}{1}{2}.\n".--> 308
format(target_id, ".", name), value)309 else:310
  raise Py4JError(
Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure:
Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0
in stage 0.0 (TID 0, localhost): org.apache.spark.SparkException:
Python worker did not connect back in time
at 
org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:136)
at 
org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:65)
at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:134)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:101)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.

: Accept timed out
at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
at 
java.net.DualStackPlainSocketImpl.socketAccept(DualStackPlainSocketImpl.java:135)
at 
java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:199)
at java.net.ServerSocket.implAccept(ServerSocket.java:545)
at java.net.ServerSocket.accept(ServerSocket.java:513)
at 
org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:131)
... 12 more

Driver stacktrace:
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
at 
org.apache.spark.scheduler.

strange behavior of pyspark RDD zip

2016-04-01 Thread Sergey
Hi!

I'm on Spark 1.6.1 in local mode on Windows.

And have issue with zip of zip'pping of two RDDs of __equal__ size and
__equal__ partitions number (I also tried to repartition both RDDs to one
partition).
I get such exception when I do rdd1.zip(rdd2).count():

File "c:\spark\python\lib\pyspark.zip\pyspark\worker.py", line 111, in main
  File "c:\spark\python\lib\pyspark.zip\pyspark\worker.py", line 106, in process
  File "c:\spark\python\lib\pyspark.zip\pyspark\serializers.py", line
263, in dump_stream
vs = list(itertools.islice(iterator, batch))
  File "c:\spark\python\pyspark\rddsampler.py", line 95, in func
for obj in iterator:
  File "c:\spark\python\lib\pyspark.zip\pyspark\serializers.py", line
322, in load_stream
" in pair: (%d, %d)" % (len(keys), len(vals)))
ValueError: Can not deserialize RDD with different number of items in
pair: (256, 512)


pySpark window functions are not working in the same way as Spark/Scala ones

2015-09-03 Thread Sergey Shcherbakov
Hello all,

I'm experimenting with Spark 1.4.1 window functions
and have come to a problem in pySpark that I've described in a Stackoverflow
question
<http://stackoverflow.com/questions/32376713/spark-window-functions-dont-work-as-expected>

In essence, the

wSpec = Window.orderBy(df.a)
df.select(df.a, func.rank().over(wSpec).alias("rank")).collect()
df.select(df.a, func.lag(df.b,1).over(wSpec).alias("prev"), df.b,
func.lead(df.b,1).over(wSpec).alias("next")).collect()

does not work in pySpark: exception for the first collect() and None output
from window function in the second collect().

While the same example in Spark/Scala works fine:

val wSpec = Window.orderBy("a")
df.select(df("a"), rank().over(wSpec).alias("rank")).collect()
df.select(df("a"), lag(df("b"),1).over(wSpec).alias("prev"), df("b"),
lead(df("b"),1).over(wSpec).alias("next"))

Am I doing anything wrong or this is a pySpark issue indeed?


Best Regards,
Sergey

PS: Here is the full pySpark shell example:

from pyspark.sql.window import Window
import pyspark.sql.functions as func

l = [(1,101),(2,202),(3,303),(4,404),(5,505)]
df = sqlContext.createDataFrame(l,["a","b"])
wSpec = Window.orderBy(df.a).rowsBetween(-1,1)
df.select(df.a, func.rank().over(wSpec).alias("rank"))
# ==> Failure org.apache.spark.sql.AnalysisException: Window function rank
does not take a frame specification.
df.select(df.a, func.lag(df.b,1).over(wSpec).alias("prev"), df.b,
func.lead(df.b,1).over(wSpec).alias("next"))
# ===>  org.apache.spark.sql.AnalysisException: Window function lag does
not take a frame specification.;


wSpec = Window.orderBy(df.a)
df.select(df.a, func.rank().over(wSpec).alias("rank"))
# ===> org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException: One or more
arguments are expected.

df.select(df.a, func.lag(df.b,1).over(wSpec).alias("prev"), df.b,
func.lead(df.b,1).over(wSpec).alias("next")).collect()
# [Row(a=1, prev=None, b=101, next=None), Row(a=2, prev=None, b=202,
next=None), Row(a=3, prev=None, b=303, next=None)]


Re: Spark UI tunneling

2015-03-23 Thread Sergey Gerasimov
Akhil,

that's what I did.

The problem is that probably web server tried to forward my request to another 
address accessible locally only.



> 23 марта 2015 г., в 11:12, Akhil Das  написал(а):
> 
> Did you try ssh -L 4040:127.0.0.1:4040 user@host
> 
> Thanks
> Best Regards
> 
>> On Mon, Mar 23, 2015 at 1:12 PM, sergunok  wrote:
>> Is it a way to tunnel Spark UI?
>> 
>> I tried to tunnel client-node:4040  but my browser was redirected from
>> localhost to some cluster locally visible domain name..
>> 
>> Maybe there is some startup option to encourage Spark UI be fully
>> accessiable just through single endpoint (address:port)?
>> 
>> Serg.
>> 
>> 
>> 
>> --
>> View this message in context: 
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-UI-tunneling-tp22184.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> 
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
> 


Stand-alone Spark on windows

2015-02-26 Thread Sergey Gerasimov
Hi!

I downloaded Spark binaries unpacked and could successfully run pyspark shell 
and write and execute some code here

BUT

I failed with submitting stand-alone python scripts or jar files via 
spark-submit:
spark-submit pi.py

I always get exception stack trace with NullPointerException in 
java.lang.ProcessBuilder.start().

What could be wrong?

Should I start some scripts before spark-submit?

I have windows 7 and spark 1.2.1

Sergey.



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Kafka - streaming from multiple topics

2014-07-07 Thread Sergey Malov
I opened JIRA issue with Spark, as an improvement though, not as a bug. 
Hopefully, someone there would notice it.

From: Tobias Pfeiffer mailto:t...@preferred.jp>>
Reply-To: "user@spark.apache.org<mailto:user@spark.apache.org>" 
mailto:user@spark.apache.org>>
Date: Thursday, July 3, 2014 at 9:41 PM
To: "user@spark.apache.org<mailto:user@spark.apache.org>" 
mailto:user@spark.apache.org>>
Subject: Re: Kafka - streaming from multiple topics

Sergey,


On Fri, Jul 4, 2014 at 1:06 AM, Sergey Malov 
mailto:sma...@collective.com>> wrote:
On the other hand, under the hood KafkaInputDStream which is create with this 
KafkaUtils call,  calls ConsumerConnector.createMessageStream which returns a 
Map[String, List[KafkaStream] keyed by topic. It is, however, not exposed.

I wonder if this is a bug. After all, KafkaUtils.createStream() returns a 
DStream[(String, String)], which pretty much looks like it should be a (topic 
-> message) mapping. However, for me, the key is always null. Maybe you could 
consider filing a bug/wishlist report?

Tobias



Re: Kafka - streaming from multiple topics

2014-07-03 Thread Sergey Malov
That’s an obvious workaround, yes, thank you Tobias.
However, I’m prototyping substitution to real batch process, where I’d have to 
create six streams (and possibly more).  It could be a bit messy.
On the other hand, under the hood KafkaInputDStream which is create with this 
KafkaUtils call,  calls ConsumerConnector.createMessageStream which returns a 
Map[String, List[KafkaStream] keyed by topic. It is, however, not exposed.
So Kafka does provide capability of creating multiple streams based on topic, 
but Spark doesn’t use it, which is unfortunate.

Sergey

From: Tobias Pfeiffer mailto:t...@preferred.jp>>
Reply-To: "user@spark.apache.org<mailto:user@spark.apache.org>" 
mailto:user@spark.apache.org>>
Date: Wednesday, July 2, 2014 at 9:54 PM
To: "user@spark.apache.org<mailto:user@spark.apache.org>" 
mailto:user@spark.apache.org>>
Subject: Re: Kafka - streaming from multiple topics

Sergey,

you might actually consider using two streams, like
  val stream1 = KafkaUtils.createStream(ssc,"localhost:2181","logs", 
Map("retarget" -> 2))
  val stream2 = KafkaUtils.createStream(ssc,"localhost:2181","logs", 
Map("datapair" -> 2))
to achieve what you want. This has the additional advantage that there are 
actually two connections to Kafka and data is possibly received on different 
cluster nodes, already increasing parallelity in an early stage of processing.

Tobias



On Thu, Jul 3, 2014 at 6:47 AM, Sergey Malov 
mailto:sma...@collective.com>> wrote:
HI,
I would like to set up streaming from Kafka cluster, reading multiple topics 
and then processing each of the differently.
So, I’d create a stream

  val stream = KafkaUtils.createStream(ssc,"localhost:2181","logs", 
Map("retarget" -> 2,"datapair" -> 2))

And then based on whether it’s “retarget” topic or “datapair”, set up different 
filter function, map function, reduce function, etc. Is it possible ?  I’d 
assume it should be, since ConsumerConnector can map of KafkaStreams keyed on 
topic, but I can’t find that it would be visible to Spark.

Thank you,

Sergey Malov




Kafka - streaming from multiple topics

2014-07-02 Thread Sergey Malov
HI,
I would like to set up streaming from Kafka cluster, reading multiple topics 
and then processing each of the differently.
So, I’d create a stream

  val stream = KafkaUtils.createStream(ssc,"localhost:2181","logs", 
Map("retarget" -> 2,"datapair" -> 2))

And then based on whether it’s “retarget” topic or “datapair”, set up different 
filter function, map function, reduce function, etc. Is it possible ?  I’d 
assume it should be, since ConsumerConnector can map of KafkaStreams keyed on 
topic, but I can’t find that it would be visible to Spark.

Thank you,

Sergey Malov



Re: Unable to redirect Spark logs to slf4j

2014-03-05 Thread Sergey Parhomenko
Hi Patrick,

Thanks for the patch. I tried building a patched version
of spark-core_2.10-0.9.0-incubating.jar but the Maven build fails:
*[ERROR]
/home/das/Work/thx/incubator-spark/core/src/main/scala/org/apache/spark/Logging.scala:22:
object impl is not a member of package org.slf4j*
*[ERROR] import org.slf4j.impl.StaticLoggerBinder*
*[ERROR]  ^*
*[ERROR]
/home/das/Work/thx/incubator-spark/core/src/main/scala/org/apache/spark/Logging.scala:106:
not found: value StaticLoggerBinder*
*[ERROR] val binder = StaticLoggerBinder.getSingleton*
*[ERROR]  ^*
*[ERROR] two errors found*

The module only has compile dependency on slf4j-api, and not
on slf4j-log4j12 or any other slf4j logging modules which
provide org.slf4j.impl.StaticLoggerBinder*. *Adding slf4j-log4j12 with
compile scope helps, and I confirm the logging is redirected to
slf4j/Logback correctly now with the patched module. I'm not sure however
if using compile scope for slf4j-log4j12 is a good idea.

--
Best regards,
Sergey Parhomenko


On 5 March 2014 20:11, Patrick Wendell  wrote:

> Hey All,
>
> We have a fix for this but it didn't get merged yet. I'll put it as a
> blocker for Spark 0.9.1.
>
>
> https://github.com/pwendell/incubator-spark/commit/66594e88e5be50fca073a7ef38fa62db4082b3c8
>
> https://spark-project.atlassian.net/browse/SPARK-1190
>
> Sergey if you could try compiling Spark with this batch and seeing if
> it works that would be great.
>
> Thanks,
> Patrick
>
>
> On Wed, Mar 5, 2014 at 10:26 AM, Paul Brown  wrote:
> >
> > Hi, Sergey --
> >
> > Here's my recipe, implemented via Maven; YMMV if you need to do it via
> sbt,
> > etc., but it should be equivalent:
> >
> > 1) Replace org.apache.spark.Logging trait with this:
> > https://gist.github.com/prb/bc239b1616f5ac40b4e5 (supplied by Patrick
> during
> > the discussion on the dev list)
> > 2) Amend your POM using the fragment that's in the same gist.
> >
> > We build two shaded JARs from the same build, one for the driver and one
> for
> > the worker; to ensure that our Logging trait is the one in use in the
> driver
> > (where it matters), we exclude that same class from the Spark JAR in the
> > shade plugin configuration.
> >
> > Best.
> > -- Paul
> >
> >
> > --
> > p...@mult.ifario.us | Multifarious, Inc. | http://mult.ifario.us/
> >
> >
> > On Wed, Mar 5, 2014 at 10:02 AM, Sergey Parhomenko <
> sparhome...@gmail.com>
> > wrote:
> >>
> >> Hi Sean,
> >>
> >> We're not using log4j actually, we're trying to redirect all logging to
> >> slf4j which then uses logback as the logging implementation.
> >>
> >> The fix you mentioned - am I right to assume it is not part of the
> latest
> >> released Spark version (0.9.0)? If so, are there any workarounds or
> advices
> >> on how to avoid this issue in 0.9.0?
> >>
> >> --
> >> Best regards,
> >> Sergey Parhomenko
> >>
> >>
> >> On 5 March 2014 14:40, Sean Owen  wrote:
> >>>
> >>> Yes I think that issue is fixed (Patrick you had the last eyes on it
> >>> IIRC?)
> >>>
> >>> If you are using log4j, in general, do not redirect log4j to slf4j.
> >>> Stuff using log4j is already using log4j, done.
> >>> --
> >>> Sean Owen | Director, Data Science | London
> >>>
> >>>
> >>> On Wed, Mar 5, 2014 at 1:12 PM, Sergey Parhomenko <
> sparhome...@gmail.com>
> >>> wrote:
> >>> > Hi,
> >>> >
> >>> > I'm trying to redirect Spark logs to slf4j. Spark seem to be using
> >>> > Log4J, so
> >>> > I did the typical steps of forcing a Log4J-based framework to use
> slf4j
> >>> > -
> >>> > manually excluded slf4j-log4j12 and log4j, and included
> >>> > log4j-over-slf4j.
> >>> > When doing that however Spark starts failing on initialization with:
> >>> > java.lang.StackOverflowError
> >>> > at java.lang.ThreadLocal.access$400(ThreadLocal.java:72)
> >>> > at
> java.lang.ThreadLocal$ThreadLocalMap.getEntry(ThreadLocal.java:376)
> >>> > at
> >>> > java.lang.ThreadLocal$ThreadLocalMap.access$000(ThreadLocal.java:261)
> >>> > at java.lang.ThreadLocal.get(ThreadLocal.java:146)
> >>> > at java.lang.StringCoding.deref(StringCoding.java:63)
> >>> > at java.lang.StringCoding.encode(StringCoding.java:330)
> >&g

Re: Unable to redirect Spark logs to slf4j

2014-03-05 Thread Sergey Parhomenko
Hi Sean,

We're not using log4j actually, we're trying to redirect all logging to
slf4j which then uses logback as the logging implementation.

The fix you mentioned - am I right to assume it is not part of the latest
released Spark version (0.9.0)? If so, are there any workarounds or advices
on how to avoid this issue in 0.9.0?

--
Best regards,
Sergey Parhomenko


On 5 March 2014 14:40, Sean Owen  wrote:

> Yes I think that issue is fixed (Patrick you had the last eyes on it IIRC?)
>
> If you are using log4j, in general, do not redirect log4j to slf4j.
> Stuff using log4j is already using log4j, done.
> --
> Sean Owen | Director, Data Science | London
>
>
> On Wed, Mar 5, 2014 at 1:12 PM, Sergey Parhomenko 
> wrote:
> > Hi,
> >
> > I'm trying to redirect Spark logs to slf4j. Spark seem to be using
> Log4J, so
> > I did the typical steps of forcing a Log4J-based framework to use slf4j -
> > manually excluded slf4j-log4j12 and log4j, and included log4j-over-slf4j.
> > When doing that however Spark starts failing on initialization with:
> > java.lang.StackOverflowError
> > at java.lang.ThreadLocal.access$400(ThreadLocal.java:72)
> > at java.lang.ThreadLocal$ThreadLocalMap.getEntry(ThreadLocal.java:376)
> > at java.lang.ThreadLocal$ThreadLocalMap.access$000(ThreadLocal.java:261)
> > at java.lang.ThreadLocal.get(ThreadLocal.java:146)
> > at java.lang.StringCoding.deref(StringCoding.java:63)
> > at java.lang.StringCoding.encode(StringCoding.java:330)
> > at java.lang.String.getBytes(String.java:916)
> > at java.io.UnixFileSystem.getBooleanAttributes0(Native Method)
> > at java.io.UnixFileSystem.getBooleanAttributes(UnixFileSystem.java:242)
> > at java.io.File.exists(File.java:813)
> > at sun.misc.URLClassPath$FileLoader.getResource(URLClassPath.java:1080)
> > at sun.misc.URLClassPath$FileLoader.findResource(URLClassPath.java:1047)
> > at sun.misc.URLClassPath.findResource(URLClassPath.java:176)
> > at java.net.URLClassLoader$2.run(URLClassLoader.java:551)
> > at java.net.URLClassLoader$2.run(URLClassLoader.java:549)
> > at java.security.AccessController.doPrivileged(Native Method)
> > at java.net.URLClassLoader.findResource(URLClassLoader.java:548)
> > at java.lang.ClassLoader.getResource(ClassLoader.java:1147)
> > at org.apache.spark.Logging$class.initializeLogging(Logging.scala:109)
> > at org.apache.spark.Logging$class.initializeIfNecessary(Logging.scala:97)
> > at org.apache.spark.Logging$class.log(Logging.scala:36)
> > at org.apache.spark.util.Utils$.log(Utils.scala:47)
> > 
> >
> > There's some related work done in SPARK-1071, but it was resolved after
> > 0.9.0 was released. In the last comment Sean refers to a
> StackOverflowError
> > which was discussed in the mailing list, I assume it might be a problem
> > similar to mine but I was not able to find that discussion.
> > Is anyone aware of a way to redirect Spark 0.9.0 logs to slf4j?
> >
> > --
> > Best regards,
> > Sergey Parhomenko
>


Unable to redirect Spark logs to slf4j

2014-03-05 Thread Sergey Parhomenko
Hi,

I'm trying to redirect Spark logs to slf4j. Spark seem to be using Log4J,
so I did the typical steps of forcing a Log4J-based framework to use slf4j
- manually excluded slf4j-log4j12 and log4j, and included log4j-over-slf4j.
When doing that however Spark starts failing on initialization with:
java.lang.StackOverflowError
 at java.lang.ThreadLocal.access$400(ThreadLocal.java:72)
at java.lang.ThreadLocal$ThreadLocalMap.getEntry(ThreadLocal.java:376)
 at java.lang.ThreadLocal$ThreadLocalMap.access$000(ThreadLocal.java:261)
at java.lang.ThreadLocal.get(ThreadLocal.java:146)
 at java.lang.StringCoding.deref(StringCoding.java:63)
at java.lang.StringCoding.encode(StringCoding.java:330)
 at java.lang.String.getBytes(String.java:916)
at java.io.UnixFileSystem.getBooleanAttributes0(Native Method)
 at java.io.UnixFileSystem.getBooleanAttributes(UnixFileSystem.java:242)
at java.io.File.exists(File.java:813)
 at sun.misc.URLClassPath$FileLoader.getResource(URLClassPath.java:1080)
at sun.misc.URLClassPath$FileLoader.findResource(URLClassPath.java:1047)
 at sun.misc.URLClassPath.findResource(URLClassPath.java:176)
at java.net.URLClassLoader$2.run(URLClassLoader.java:551)
 at java.net.URLClassLoader$2.run(URLClassLoader.java:549)
at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findResource(URLClassLoader.java:548)
at java.lang.ClassLoader.getResource(ClassLoader.java:1147)
 at org.apache.spark.Logging$class.initializeLogging(Logging.scala:109)
at org.apache.spark.Logging$class.initializeIfNecessary(Logging.scala:97)
 at org.apache.spark.Logging$class.log(Logging.scala:36)
at org.apache.spark.util.Utils$.log(Utils.scala:47)
 

There's some related work done in
SPARK-1071<https://spark-project.atlassian.net/browse/SPARK-1071>,
but it was resolved after 0.9.0 was released. In the last comment Sean
refers to a StackOverflowError which was discussed in the mailing list, I
assume it might be a problem similar to mine but I was not able to find
that discussion.
Is anyone aware of a way to redirect Spark 0.9.0 logs to slf4j?

--
Best regards,
Sergey Parhomenko