Re: Dataset - withColumn and withColumnRenamed that accept Column type

2018-07-17 Thread Tathagata Das
Yes. Yes you can.

On Tue, Jul 17, 2018 at 11:42 AM, Sathi Chowdhury  wrote:

> Hi,
> My question is about ability to integrate spark streaming with multiple
> clusters.Is it a supported use case. An example of that is that two topics
> owned by different group and they have their own kakka infra .
> Can i have two dataframes as a result of spark.readstream listening to
> different kafka clueters in the same spark screaming job?
> Any one has solved this usecase before?
>
>
> Thanks.
> Sathi
>


Re: Dataframe from partitioned parquet table missing partition columns from schema

2018-07-17 Thread Nirav Patel
Just found out that I need following option while reading:

.option("basePath", "hdfs://localhost:9000/ptest/")


https://stackoverflow.com/questions/43192940/why-is-partition-key-column-missing-from-dataframe



On Tue, Jul 17, 2018 at 3:48 PM, Nirav Patel  wrote:

> I created a hive table with parquet storage using sparkSql. Now in hive
> cli when I do describe and Select I can see partition columns in both as
> regular columns as well as partition column. However if I try to do same in
> sparkSql (Dataframe) I don't see partition columns.
>
> I need to do projection on partition column in spark. How do I do that now?
>

-- 


 

 
   
   
      



Dataframe from partitioned parquet table missing partition columns from schema

2018-07-17 Thread Nirav Patel
I created a hive table with parquet storage using sparkSql. Now in hive cli
when I do describe and Select I can see partition columns in both as
regular columns as well as partition column. However if I try to do same in
sparkSql (Dataframe) I don't see partition columns.

I need to do projection on partition column in spark. How do I do that now?

-- 


 

 
   
   
      



Re: Pyspark access to scala/java libraries

2018-07-17 Thread Mohit Jaggi
Thanks 0xF0F0F0 and Ashutosh for the pointers.

Holden,
I am trying to look into sparklingml...what am I looking for? Also which
chapter/page of your book should I look at?

Mohit.

On Sun, Jul 15, 2018 at 3:02 AM Holden Karau  wrote:

> If you want to see some examples in a library shows a way to do it -
> https://github.com/sparklingpandas/sparklingml and high performance spark
> also talks about it.
>
> On Sun, Jul 15, 2018, 11:57 AM <0xf0f...@protonmail.com.invalid> wrote:
>
>> Check
>> https://stackoverflow.com/questions/31684842/calling-java-scala-function-from-a-task
>>
>> ​Sent with ProtonMail Secure Email.​
>>
>> ‐‐‐ Original Message ‐‐‐
>>
>> On July 15, 2018 8:01 AM, Mohit Jaggi  wrote:
>>
>> > Trying again…anyone know how to make this work?
>> >
>> > > On Jul 9, 2018, at 3:45 PM, Mohit Jaggi mohitja...@gmail.com wrote:
>> > >
>> > > Folks,
>> > >
>> > > I am writing some Scala/Java code and want it to be usable from
>> pyspark.
>> > >
>> > > For example:
>> > >
>> > > class MyStuff(addend: Int) {
>> > >
>> > > def myMapFunction(x: Int) = x + addend
>> > >
>> > > }
>> > >
>> > > I want to call it from pyspark as:
>> > >
>> > > df = ...
>> > >
>> > > mystuff = sc._jvm.MyStuff(5)
>> > >
>> > > df[‘x’].map(lambda x: mystuff.myMapFunction(x))
>> > >
>> > > How can I do this?
>> > >
>> > > Mohit.
>> >
>> > --
>> >
>> > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>
>>
>>
>> -
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>
>>


Re: Heap Memory in Spark 2.3.0

2018-07-17 Thread Imran Rashid
perhaps this is https://issues.apache.org/jira/browse/SPARK-24578?

that was reported as a performance issue, not OOMs, but its in the exact
same part of the code and the change was to reduce the memory pressure
significantly.

On Mon, Jul 16, 2018 at 1:43 PM, Bryan Jeffrey 
wrote:

> Hello.
>
> I am working to move our system from Spark 2.1.0 to Spark 2.3.0.  Our
> system is running on Spark managed via Yarn.  During the course of the move
> I mirrored the settings to our new cluster.  However, on the Spark 2.3.0
> cluster with the same resource allocation I am seeing a number of executors
> die due to OOM:
>
> 18/07/16 17:23:06 ERROR YarnClusterScheduler: Lost executor 5 on wn80:
> Container killed by YARN for exceeding memory limits. 22.0 GB of 22 GB
> physical memory used. Consider boosting spark.yarn.executor.memoryOver
> head.
>
> I increased spark.driver.memoryOverhead and spark.executor.memoryOverhead
> from the default (384) to 2048.  I went ahead and disabled vmem and pmem
> Yarn checks on the cluster.  With that disabled I see the following error:
>
> Caused by: java.lang.OutOfMemoryError: Java heap space
>   at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
>   at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
>   at 
> io.netty.buffer.CompositeByteBuf.nioBuffer(CompositeByteBuf.java:1466)
>   at io.netty.buffer.AbstractByteBuf.nioBuffer(AbstractByteBuf.java:1203)
>   at 
> org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:140)
>   at 
> org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
>   at 
> io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
>   at 
> io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
>   at 
> io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
>   at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
>   at 
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
>   at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
>   at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
>   at 
> io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
>   at 
> io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:802)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:814)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:794)
>   at 
> io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:831)
>   at 
> io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1041)
>   at 
> io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:300)
>   at 
> org.apache.spark.network.server.TransportRequestHandler.respond(TransportRequestHandler.java:222)
>   at 
> org.apache.spark.network.server.TransportRequestHandler.processFetchRequest(TransportRequestHandler.java:146)
>   at 
> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:109)
>   at 
> org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:118)
>
>
>
> Looking at GC:
>
>[Eden: 16.0M(8512.0M)->0.0B(8484.0M) Survivors: 4096.0K->4096.0K Heap: 
> 8996.7M(20.0G)->8650.3M(20.0G)]
>  [Times: user=0.03 sys=0.01, real=0.01 secs]
>  794.949: [G1Ergonomics (Heap Sizing) attempt heap expansion, reason: 
> allocation request failed, allocation request: 401255000 bytes]
>  794.949: [G1Ergonomics (Heap Sizing) expand the heap, requested expansion 
> amount: 401255000 bytes, attempted expansion amount: 402653184 bytes]
>  794.949: [G1Ergonomics (Heap Sizing) did not expand the heap, reason: heap 
> already fully expanded]
> 

joining streams from multiple kafka clusters

2018-07-17 Thread sathich
Hi,
My question is about ability to integrate spark streaming with multiple
clusters.Is it a supported use case. An example of that is that two topics
owned by different group and they have their own kakka infra .
Can i have two dataframes as a result of spark.readstream listening to
different kafka clueters in the same spark screaming job?
Any one has solved this usecase before? 


Thanks.
Sathi



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Dataset - withColumn and withColumnRenamed that accept Column type

2018-07-17 Thread sathich
this may work
val df_post= listCustomCols
.foldLeft(df_pre){(tempDF, listValue) =>
  tempDF.withColumn(
listValue.name,
new Column(listValue.name.toString + funcUDF(listValue.name))
)

and outsource the renaming to an udf

or  you can rename the column of one of the datasets before join itself.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark streaming connecting to two kafka clusters

2018-07-17 Thread Sathi Chowdhury
Hi,My question is about ability to integrate spark streaming with multiple 
clusters.Is it a supported use case. An example of that is that two topics 
owned by different group and they have their own kakka infra .Can i have two 
dataframes as a result of spark.readstream listening to different kafka 
clueters in the same spark screaming job?Any one has solved this usecase 
before? 

Thanks.Sathi

Re: Dataset - withColumn and withColumnRenamed that accept Column type

2018-07-17 Thread Sathi Chowdhury
Hi,My question is about ability to integrate spark streaming with multiple 
clusters.Is it a supported use case. An example of that is that two topics 
owned by different group and they have their own kakka infra .Can i have two 
dataframes as a result of spark.readstream listening to different kafka 
clueters in the same spark screaming job?Any one has solved this usecase 
before? 

Thanks.Sathi

Re: Running Production ML Pipelines

2018-07-17 Thread Shmuel Blitz
Hi,

This is a very general question. It's hard to andswer your question without
fully understanding your business and technological needs.

You might want to watch this video:
https://www.youtube.com/watch?v=2UKSLHDH5vc=8s

Shmuel

On Tue, Jul 17, 2018 at 12:11 AM Gautam Singaraju <
gautam.singar...@gmail.com> wrote:

> Hi all,
>
> Any suggestions on optimization of running ML Pipeline inference in a
> webapp in a multi-tenant low-latency mode.
>
> Suggestions would be appreciated.
>
> Thank you!
>


-- 
Shmuel Blitz
Big Data Developer
Email: shmuel.bl...@similarweb.com
www.similarweb.com






Re:Re: spark sql data skew

2018-07-17 Thread 崔苗
30G user data, how to get distinct users count after creating a composite key 
based on company and userid?

在 2018-07-13 18:24:52,Jean Georges Perrin  写道:
Just thinking out loud… repartition by key? create a composite key based on 
company and userid? 

How big is your dataset?

On Jul 13, 2018, at 06:20, 崔苗  wrote:

Hi,
when I want to count(distinct userId) by company,I met the data skew and the 
task takes too long time,how to count distinct by keys on skew data in spark 
sql ?


thanks for any reply













Query on Profiling Spark Code

2018-07-17 Thread Aakash Basu
Hi guys,

I'm trying to profile my Spark code on cProfiler and check where more time
is taken. I found the most time taken is by some socket object, which I'm
quite clueless of, as to where it is used.

Can anyone shed some light on this?


ncallstottimepercallcumtimepercallfilename:lineno(function)
11789 479.8 0.0407 479.8 0.0407 ~:0()


Thanks,
Aakash.