Split a row into multiple rows Java
I'm trying to generate multiple rows from a single row I have schema Name Id Date 0100 0200 0300 0400 and would like to make it into a vertical format with schema Name Id Date Time I have the code below and get the error Caused by: java.lang.RuntimeException: org.apache.spark.sql.catalyst.expressions.GenericRow is not a valid external type for schema of string at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) StructType schemata = DataTypes.createStructType( new StructField[]{ DataTypes.createStructField("Name", DataTypes.StringType, false), DataTypes.createStructField("Id", DataTypes.StringType, false), DataTypes.createStructField("Date", DataTypes.StringType, false), DataTypes.createStructField("Time", DataTypes.StringType, false) } ); ExpressionEncoder encoder = RowEncoder.apply(schemata); Dataset modifiedRDD = intervalDF.flatMap(new FlatMapFunction() { @Override public Iterator call (Row row) throws Exception { List rowList = new ArrayList(); String[] timeList = {"0100", "0200", "0300", "0400"} for (String time : timeList) { Row r1 = RowFactory.create(row.getAs("sdp_id"), "WGL", row.getAs("Name"), row.getAs("Id"), row.getAs("Date"), timeList[0], row.getAs(timeList[0])); //updated row by creating new Row rowList.add(RowFactory.create(r1)); } return rowList.iterator(); } }, encoder); modifiedRDD.write().csv("file:///Users/mod/out"); -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Use Arrow instead of Pickle without pandas_udf
Hey Holden, Thanks for your reply, We currently using a python function that produces a Row(TS=LongType(), bin=BinaryType()). We use this function like this dataframe.rdd.map(my_function).toDF().write.parquet() To reuse it in pandas_udf, we changes the return type to StructType(StructField(Long), StructField(BinaryType). 1)But we face an issue that StructType is not supported by pandas_udf. So I was wondering to still continue to reuse dataftame.rdd.map but get an improvement in serialization by using ArrowFormat instead of Pickle. From: hol...@pigscanfly.ca Sent: July 25, 2018 4:41 PM To: hich...@elkhalfi.com Cc: user@spark.apache.org Subject: Re: Use Arrow instead of Pickle without pandas_udf Not currently. What's the problem with pandas_udf for your use case? On Wed, Jul 25, 2018 at 1:27 PM, Hichame El Khalfi mailto:hich...@elkhalfi.com>> wrote: Hi There, Is there a way to use Arrow format instead of Pickle but without using pandas_udf ? Thank for your help, Hichame -- Twitter: https://twitter.com/holdenkarau
Re: Use Arrow instead of Pickle without pandas_udf
Not currently. What's the problem with pandas_udf for your use case? On Wed, Jul 25, 2018 at 1:27 PM, Hichame El Khalfi wrote: > Hi There, > > > Is there a way to use Arrow format instead of Pickle but without using > pandas_udf ? > > > Thank for your help, > > > Hichame > -- Twitter: https://twitter.com/holdenkarau
Use Arrow instead of Pickle without pandas_udf
Hi There, Is there a way to use Arrow format instead of Pickle but without using pandas_udf ? Thank for your help, Hichame
Backpressure initial rate not working
I have enabled the spark.streaming.backpressure.enabled setting and also set spark.streaming.backpressure.initialRate to 15000, but my spark job is not respecting these settings when reading from Kafka after a failure. In my kafka topic around 500k records are waiting for being processed and they are all taken in 1 huge batch which ultimately takes a long time and fails with executor failure exception. We don't have more resources to give in our test cluster and we expect the backpressure to kick in and take smaller batches. What can I be doing wrong? Thanks & Regards Biplob Biswas
Re: [Spark Structured Streaming on K8S]: Debug - File handles/descriptor (unix pipe) leaking
We're experiencing the exact same issue while running load tests on Spark 2.3.1 with Structured Streaming and `mapGroupsWithState`. -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Bug in Window Function
Hi Elior, Could you show the query that led to the exception? Pozdrawiam, Jacek Laskowski https://about.me/JacekLaskowski Mastering Spark SQL https://bit.ly/mastering-spark-sql Spark Structured Streaming https://bit.ly/spark-structured-streaming Mastering Kafka Streams https://bit.ly/mastering-kafka-streams Follow me at https://twitter.com/jaceklaskowski On Wed, Jul 25, 2018 at 10:04 AM, Elior Malul wrote: > Exception in thread "main" org.apache.spark.sql.AnalysisException: > collect_set(named_struct(value, country#123 AS value#346, count, > (cast(count(country#123) windowspecdefinit ion(campaign_id#104, > app_id#93, country#123, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED > FOLLOWING) as double) / cast(count(1) windowspecdefinition(campaign_id#104, > app_id #93, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) > as double)) AS count#349) AS histogram_country#350, 0, 0) > windowspecdefinition(campaign_id#104, app_id#93, ROWS BETWEEN > UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS > collect_set(named_struct(NamePlaceholder(), > country AS `value`, NamePlaceholder(), (CAST(count(country) OVER (PARTITI > ON BY campaign_id, app_id, country UnspecifiedFrame) AS DOUBLE) / > CAST(count(1) OVER (PARTITION BY campaign_id, app_id UnspecifiedFrame) AS > DOUBLE)) AS `count`) AS `histogram _country`) OVER (PARTITION BY > campaign_id, app_id UnspecifiedFrame)#352 has multiple Window > Specifications (ArrayBuffer(windowspecdefinition(campaign_id#104, > app_id#93, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING), > windowspecdefinition(campaign_id#104, app_id#93, country#123, ROWS > BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)) ).Please file a > bug report with this error message, stack trace, and the query.; >
Bug in Window Function
Exception in thread "main" org.apache.spark.sql.AnalysisException: collect_set(named_struct(value, country#123 AS value#346, count, (cast(count(country#123) windowspecdefinit ion(campaign_id#104, app_id#93, country#123, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as double) / cast(count(1) windowspecdefinition(campaign_id#104, app_id #93, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) as double)) AS count#349) AS histogram_country#350, 0, 0) windowspecdefinition(campaign_id#104, app_id#93, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS collect_set(named_struct(NamePlaceholder(), country AS `value`, NamePlaceholder(), (CAST(count(country) OVER (PARTITI ON BY campaign_id, app_id, country UnspecifiedFrame) AS DOUBLE) / CAST(count(1) OVER (PARTITION BY campaign_id, app_id UnspecifiedFrame) AS DOUBLE)) AS `count`) AS `histogram _country`) OVER (PARTITION BY campaign_id, app_id UnspecifiedFrame)#352 has multiple Window Specifications (ArrayBuffer(windowspecdefinition(campaign_id#104, app_id#93, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING), windowspecdefinition(campaign_id#104, app_id#93, country#123, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)) ).Please file a bug report with this error message, stack trace, and the query.;
***UNCHECKED*** UNSUBSCRIBE
How dose spark streaming program call python file
I am trying to build a real-time system with spark (written with scala), but here are some algorithm file written in python. How can i call the algorithm file ? Any idea how to let it work?
***UNCHECKED*** How dose spark streaming program (written with scala)call python file
I am trying to build a real-time system with spark (written with scala), but here are some algorithm file written in python. How can i call the algorithm file ? Any idea how to let it work?