Re: [Beginner] Kafka 0.11 header support in Spark Structured Streaming
I made a JIRA for it - https://issues.apache.org/jira/browse/SPARK-23539 Unfortunately it is blocked by Kafka version upgrade, which has a few nasty issues related to Kafka bugs - https://issues.apache.org/jira/browse/SPARK-18057 On Wed, Feb 28, 2018 at 3:17 PM, karthikuswrote: > TD, > > Thanks for your response. > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
[Structured Streaming] Handling Kakfa Stream messages with different JSON Schemas.
Hi all, I have the following code to stream Kafka data and apply a schema called "hbSchema" on it and then act on the data. val df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "10.102.255.241:9092") .option("subscribe", "mabr_avro_json1") .option("failOnDataLoss", "false") .load() .selectExpr("""deserialize("avro_json1", value) AS message""") import spark.implicits._ val df1 = df .selectExpr("cast (value as string) as json") .select(from_json($"message", schema=hbSchema).as("data")) .select("data.*") But, what if the data in Kafka topic have different schemas ? How do I apply different schemas based on the data ? -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Using Thrift with Dataframe
Hi guys, I have a RDD of thrift struct. I want to convert it into a dataframe. Can someone suggest how I can do this? Thanks Nikhil
Re: [Beginner] Kafka 0.11 header support in Spark Structured Streaming
TD, Thanks for your response. -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: [Beginner] How to save Kafka Dstream data to parquet ?
There is no good way to save to parquet without causing downstream consistency issues. You could use foreachRDD to get each RDD, convert it to DataFrame/Dataset, and write out as parquet files. But you will later run into issues with partial files caused by failures, etc. On Wed, Feb 28, 2018 at 11:09 AM, karthikuswrote: > Hi all, > > I have a Kafka stream data and I need to save the data in parquet format > without using Structured Streaming (due to the lack of Kafka Message header > support). > > val kafkaStream = > KafkaUtils.createDirectStream( > streamingContext, > LocationStrategies.PreferConsistent, > ConsumerStrategies.Subscribe[String, String]( > topics, > kafkaParams > ) > ) > // process the messages > val messages = kafkaStream.map(record => (record.key, record.value)) > val lines = messages.map(_._2) > > Now, how do I save it as parquet ? All the examples that I have come across > uses SQLContext which is deprecated. ! Any help appreciated ! > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Re: org.apache.spark.SparkException: Task failed while writing rows
Yeah, without actually seeing what's happening on that line, it'd be difficult to say for sure. You can check what patches HortonWorks applied, or/and ask them. And yeah, seg fault is totally possible on any size of the data. But you should've seen it in the `stdout` (assuming that the regular logs go to `stderr`) On Wed, Feb 28, 2018 at 2:53 PM, unk1102wrote: > Hi Vadim thanks I use HortonWorks package. I dont think there are any seg > faults are dataframe I am trying to write is very small in size. Can it > still create seg fault? > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Re: [Beginner] How to save Kafka Dstream data to parquet ?
I don’t think sql context is “deprecated” in this sense. It’s still accessible by earlier versions of Spark. But yes, at first glance it looks like you are correct. I don’t see a recordWriter method for parquet outside of the SQL package. https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.streaming.DataStreamWriter Here is an example that uses Sql context. I believe the SQL context is necessary for strongly typed, self describing, binary, columnar formatted files like Parquet. https://community.hortonworks.com/articles/72941/writing-parquet-on-hdfs-using-spark-streaming.html Otherwise you’ll probably be looking at a customWriter. https://parquet.apache.org/documentation/latest/ AFAIK, If you were to implement a custom writer, you still wouldn’t escape the parquet formatting paradigm the DF API solves. Spark needs a way to map data types for Parquet conversion. Hope this helps, -Pat On 2/28/18, 11:09 AM, "karthikus"wrote: Hi all, I have a Kafka stream data and I need to save the data in parquet format without using Structured Streaming (due to the lack of Kafka Message header support). val kafkaStream = KafkaUtils.createDirectStream( streamingContext, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String]( topics, kafkaParams ) ) // process the messages val messages = kafkaStream.map(record => (record.key, record.value)) val lines = messages.map(_._2) Now, how do I save it as parquet ? All the examples that I have come across uses SQLContext which is deprecated. ! Any help appreciated ! -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: org.apache.spark.SparkException: Task failed while writing rows
Hi Vadim thanks I use HortonWorks package. I dont think there are any seg faults are dataframe I am trying to write is very small in size. Can it still create seg fault? -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: org.apache.spark.SparkException: Task failed while writing rows
Who's your spark provider? EMR, Azure, Databricks, etc.? Maybe contact them, since they've probably applied some patches Also have you checked `stdout` for some Segfaults? I vaguely remember getting `Task failed while writing rows at` and seeing some segfaults that caused that On Wed, Feb 28, 2018 at 2:07 PM, unk1102wrote: > Hi thanks Vadim you are right I saw that line already 468 I dont see any > code > it is just comment yes I am sure I am using all spark-* jar which is built > for spark 2.2.0 and Scala 2.11. I am also stuck unfortunately with these > errors not sure how to solve them. > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
[Beginner] How to save Kafka Dstream data to parquet ?
Hi all, I have a Kafka stream data and I need to save the data in parquet format without using Structured Streaming (due to the lack of Kafka Message header support). val kafkaStream = KafkaUtils.createDirectStream( streamingContext, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String]( topics, kafkaParams ) ) // process the messages val messages = kafkaStream.map(record => (record.key, record.value)) val lines = messages.map(_._2) Now, how do I save it as parquet ? All the examples that I have come across uses SQLContext which is deprecated. ! Any help appreciated ! -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: org.apache.spark.SparkException: Task failed while writing rows
Hi thanks Vadim you are right I saw that line already 468 I dont see any code it is just comment yes I am sure I am using all spark-* jar which is built for spark 2.2.0 and Scala 2.11. I am also stuck unfortunately with these errors not sure how to solve them. -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: org.apache.spark.SparkException: Task failed while writing rows
I'm sorry, didn't see `Caused by: java.lang.NullPointerException at org.apache.spark.sql.SparkSession$$anonfun$3.apply(SparkSession.scala:468)` Are you sure that you use 2.2.0? I don't see any code on that line https://github.com/apache/spark/blob/v2.2.0/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala#L468 Also pretty strange that it fails there On Wed, Feb 28, 2018 at 1:55 PM, unk1102wrote: > Hi thanks for the reply I only see NPE and Task failed while writing rows > all > over places I dont see any other errors expect SparkException job aborted > and followed by two exception I pasted earlier. > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Re: org.apache.spark.SparkException: Task failed while writing rows
Hi thanks for the reply I only see NPE and Task failed while writing rows all over places I dont see any other errors expect SparkException job aborted and followed by two exception I pasted earlier. -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: org.apache.spark.SparkException: Task failed while writing rows
There should be another exception trace (basically, the actual cause) after this one, could you post it? On Wed, Feb 28, 2018 at 1:39 PM, unk1102wrote: > Hi I am getting the following exception when I try to write DataFrame using > the following code. Please guide. I am using Spark 2.2.0. > > df.write.format("parquet").mode(SaveMode.Append); > > org.apache.spark.SparkException: Task failed while writing rows at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$ > spark$sql$execution$datasources$FileFormatWriter$$ > executeTask(FileFormatWriter.scala:270) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$ > write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:189) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$ > write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:188) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at > org.apache.spark.scheduler.Task.run(Task.scala:108) at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) at > java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) Caused by: > java.lang.NullPointerException at > org.apache.spark.sql.SparkSession$$anonfun$3.apply(SparkSession.scala:468) > at > org.apache.spark.sql.SparkSession$$anonfun$3.apply(SparkSession.scala:468) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at > scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at > org.apache.spark.sql.execution.datasources.FileFormatWriter$ > SingleDirectoryWriteTask.execute(FileFormatWriter.scala:324) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$ > apache$spark$sql$execution$datasources$FileFormatWriter$$ > executeTask$3.apply(FileFormatWriter.scala:256) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$ > apache$spark$sql$execution$datasources$FileFormatWriter$$ > executeTask$3.apply(FileFormatWriter.scala:254) > at > org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCa > llbacks(Utils.scala:1371) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$ > spark$sql$execution$datasources$FileFormatWriter$$ > executeTask(FileFormatWriter.scala:259) > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
org.apache.spark.SparkException: Task failed while writing rows
Hi I am getting the following exception when I try to write DataFrame using the following code. Please guide. I am using Spark 2.2.0. df.write.format("parquet").mode(SaveMode.Append); org.apache.spark.SparkException: Task failed while writing rows at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:270) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:189) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:188) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.NullPointerException at org.apache.spark.sql.SparkSession$$anonfun$3.apply(SparkSession.scala:468) at org.apache.spark.sql.SparkSession$$anonfun$3.apply(SparkSession.scala:468) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:324) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:256) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:254) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1371) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:259) -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Joins in spark for large tables
Hi, Is there any best approach to reduce shuffling in spark. I have two tables and both of them are large. any suggestions? I saw only about broadcast but that will not work in my case. Thanks, Asmath
Re: how to add columns to row when column has a different encoder?
Anyone know a way right now to do this? As best as I can tell I need a custom expression to pass to udf to do this. Just finished a protobuf encoder and it feels like expression is not meant to be public (good amount of things are private[sql]), am I wrong about this? Am I looking at the right interface to add such a UDF? Thanks for your help! On Mon, Feb 26, 2018, 3:50 PM David Capwellwrote: > I have a row that looks like the following pojo > > case class Wrapper(var id: String, var bytes: Array[Byte]) > > Those bytes are a serialized pojo that looks like this > > case class Inner(var stuff: String, var moreStuff: String) > > I right now have encoders for both the types, but I don't see how to merge > the two into a unified row that looks like the following > > > struct> > > If I know how to deserialize the bytes and have a encoder, how could I get > the above schema? I was looking at ds.withColumn("inner", ???) but wasn't > sure how to go from pojo + encoder to a column. Is there a better way to > do this? > > Thanks for your time reading this email >