Split a row into multiple rows Java

2018-07-25 Thread nookala
I'm trying to generate multiple rows from a single row

I have schema

Name Id Date 0100 0200 0300 0400

and would like to make it into a vertical format with schema

Name Id Date Time

I have the code below and get the error 

Caused by: java.lang.RuntimeException:
org.apache.spark.sql.catalyst.expressions.GenericRow is not a valid external
type for schema of string
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)

   StructType schemata = DataTypes.createStructType(

   new StructField[]{   

   DataTypes.createStructField("Name", DataTypes.StringType,
false),  
   DataTypes.createStructField("Id", DataTypes.StringType,
false),   
DataTypes.createStructField("Date",
DataTypes.StringType, false), 
   DataTypes.createStructField("Time", DataTypes.StringType,
false)   
   }

   );   

   ExpressionEncoder encoder = RowEncoder.apply(schemata); 

   Dataset modifiedRDD = intervalDF.flatMap(new
FlatMapFunction() {   
@Override   

   public Iterator call (Row row) throws Exception {   

   List rowList = new ArrayList();

   String[] timeList = {"0100", "0200", "0300", "0400"}
   for (String time : timeList) {   



   Row r1 = RowFactory.create(row.getAs("sdp_id"),  

   "WGL",   

   row.getAs("Name"),   
  
   row.getAs("Id"), 
   
   row.getAs("Date"),   

   timeList[0], 

   row.getAs(timeList[0])); 





   //updated row by creating new Row

   rowList.add(RowFactory.create(r1));  





   }

   return rowList.iterator();   

   }

   }, encoder);
modifiedRDD.write().csv("file:///Users/mod/out");   
  



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Use Arrow instead of Pickle without pandas_udf

2018-07-25 Thread Hichame El Khalfi
Hey Holden,
Thanks for your reply,

We currently using a python function that produces a Row(TS=LongType(), 
bin=BinaryType()).
We use this function like this 
dataframe.rdd.map(my_function).toDF().write.parquet()

To reuse it in pandas_udf, we changes the return type to 
StructType(StructField(Long), StructField(BinaryType).

1)But we face an issue that StructType is not supported by pandas_udf.

So I was wondering to still continue to reuse dataftame.rdd.map but get an 
improvement in serialization by using ArrowFormat instead of Pickle.

From: hol...@pigscanfly.ca
Sent: July 25, 2018 4:41 PM
To: hich...@elkhalfi.com
Cc: user@spark.apache.org
Subject: Re: Use Arrow instead of Pickle without pandas_udf


Not currently. What's the problem with pandas_udf for your use case?

On Wed, Jul 25, 2018 at 1:27 PM, Hichame El Khalfi 
mailto:hich...@elkhalfi.com>> wrote:

Hi There,


Is there a way to use Arrow format instead of Pickle but without using 
pandas_udf ?


Thank for your help,


Hichame



--
Twitter: https://twitter.com/holdenkarau


Re: Use Arrow instead of Pickle without pandas_udf

2018-07-25 Thread Holden Karau
Not currently. What's the problem with pandas_udf for your use case?

On Wed, Jul 25, 2018 at 1:27 PM, Hichame El Khalfi 
wrote:

> Hi There,
>
>
> Is there a way to use Arrow format instead of Pickle but without using
> pandas_udf ?
>
>
> Thank for your help,
>
>
> Hichame
>



-- 
Twitter: https://twitter.com/holdenkarau


Use Arrow instead of Pickle without pandas_udf

2018-07-25 Thread Hichame El Khalfi
Hi There,


Is there a way to use Arrow format instead of Pickle but without using 
pandas_udf ?


Thank for your help,


Hichame


Backpressure initial rate not working

2018-07-25 Thread Biplob Biswas
I have enabled the spark.streaming.backpressure.enabled setting and also
 set spark.streaming.backpressure.initialRate  to 15000, but my spark job
is not respecting these settings when reading from Kafka after a failure.

In my kafka topic around 500k records are waiting for being processed and
they are all taken in 1 huge batch which ultimately takes a long time and
fails with executor failure exception. We don't have more resources to give
in our test cluster and we expect the backpressure to kick in and take
smaller batches.

What can I be doing wrong?


Thanks & Regards
Biplob Biswas


Re: [Spark Structured Streaming on K8S]: Debug - File handles/descriptor (unix pipe) leaking

2018-07-25 Thread Yuval.Itzchakov
We're experiencing the exact same issue while running load tests on Spark
2.3.1 with Structured Streaming and `mapGroupsWithState`.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Bug in Window Function

2018-07-25 Thread Jacek Laskowski
Hi Elior,

Could you show the query that led to the exception?

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
Mastering Spark SQL https://bit.ly/mastering-spark-sql
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
Follow me at https://twitter.com/jaceklaskowski

On Wed, Jul 25, 2018 at 10:04 AM, Elior Malul  wrote:

> Exception in thread "main" org.apache.spark.sql.AnalysisException:
> collect_set(named_struct(value, country#123 AS value#346, count,
> (cast(count(country#123) windowspecdefinit ion(campaign_id#104,
> app_id#93, country#123, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED
> FOLLOWING) as double) / cast(count(1) windowspecdefinition(campaign_id#104,
> app_id #93, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)
> as double)) AS count#349) AS histogram_country#350, 0, 0)
> windowspecdefinition(campaign_id#104, app_id#93, ROWS  BETWEEN
> UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS 
> collect_set(named_struct(NamePlaceholder(),
> country AS `value`, NamePlaceholder(), (CAST(count(country) OVER (PARTITI
>   ON BY campaign_id, app_id, country UnspecifiedFrame) AS DOUBLE) /
> CAST(count(1) OVER (PARTITION BY campaign_id, app_id UnspecifiedFrame) AS
> DOUBLE)) AS `count`) AS `histogram _country`) OVER (PARTITION BY
> campaign_id, app_id UnspecifiedFrame)#352 has multiple Window
> Specifications (ArrayBuffer(windowspecdefinition(campaign_id#104,
> app_id#93, ROWS  BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING),
> windowspecdefinition(campaign_id#104, app_id#93, country#123, ROWS
> BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)) ).Please file a
> bug report with this error message, stack trace, and the query.;
>


Bug in Window Function

2018-07-25 Thread Elior Malul
Exception in thread "main" org.apache.spark.sql.AnalysisException: 
collect_set(named_struct(value, country#123 AS value#346, count, 
(cast(count(country#123) windowspecdefinit ion(campaign_id#104, app_id#93, 
country#123, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as 
double) / cast(count(1) windowspecdefinition(campaign_id#104, app_id #93, 
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as double)) AS 
count#349) AS histogram_country#350, 0, 0) 
windowspecdefinition(campaign_id#104, app_id#93, ROWS  BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING) AS 
collect_set(named_struct(NamePlaceholder(), country AS `value`, 
NamePlaceholder(), (CAST(count(country) OVER (PARTITI ON BY campaign_id, 
app_id, country UnspecifiedFrame) AS DOUBLE) / CAST(count(1) OVER (PARTITION BY 
campaign_id, app_id UnspecifiedFrame) AS DOUBLE)) AS `count`) AS `histogram 
_country`) OVER (PARTITION BY campaign_id, app_id UnspecifiedFrame)#352 has 
multiple Window Specifications 
(ArrayBuffer(windowspecdefinition(campaign_id#104, app_id#93, ROWS  BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING), 
windowspecdefinition(campaign_id#104, app_id#93, country#123, ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)) ).Please file a bug report 
with this error message, stack trace, and the query.;

***UNCHECKED*** UNSUBSCRIBE

2018-07-25 Thread sridhararao mutluri



How dose spark streaming program call python file

2018-07-25 Thread 康逸之

I am trying to build a real-time system with spark (written with scala), but 
here are some algorithm file written in python. How can i call the algorithm 
file ?

Any idea how to let it work?



***UNCHECKED*** How dose spark streaming program (written with scala)call python file

2018-07-25 Thread 康逸之

I am trying to build a real-time system with spark (written with scala), but 
here are some algorithm file written in python. How can i call the algorithm 
file ?

Any idea how to let it work?