Could you give the full stack trace of the exception?

Also, can you do `dataframe2.explain(true)` and show us the plan output?



On Wed, Jan 31, 2018 at 3:35 PM, M Singh <mans2si...@yahoo.com.invalid>
wrote:

> Hi Folks:
>
> I have to add a column to a structured *streaming* dataframe but when I
> do that (using select or withColumn) I get an exception.  I can add a
> column in structured *non-streaming* structured dataframe. I could not
> find any documentation on how to do this in the following doc  [
> https://spark.apache.org/docs/latest/
> *structured-streaming-programming-guide*.html]
>
> I am using spark 2.4.0-SNAPSHOT
>
> Please let me know what I could be missing.
>
> Thanks for your help.
>
> (I am also attaching the source code for the structured streaming,
> structured non-streaming classes and input file with this email)
>
> <exception>
> org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call
> to dataType on unresolved object, tree: 'cts
> at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(
> unresolved.scala:105)
> at org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(
> StructType.scala:435)
> </exception>
>
> Here is the input file (in the ./data directory) - note tokens are
> separated by '\t'
>
> 1 v1
> 2 v1
> 2 v2
> 3 v3
> 3 v1
>
> Here is the code with dataframe (*non-streaming*) which works:
>
> import scala.collection.immutable
> import org.apache.spark.sql.functions._
> import org.apache.spark.sql._
> import org.apache.spark.sql.types._
>
> object StructuredTest {
>   def main(args:Array[String]) : Unit = {
>     val sparkBuilder = SparkSession
>       .builder.
>       appName("StreamingTest").master("local[4]")
>
>     val spark = sparkBuilder.getOrCreate()
>
>     val schema = StructType(
>         Array(
>           StructField("id", StringType, false),
>           StructField("visit", StringType, false)
>           ))
>     var dataframe = spark.read.option("sep","\t").schema(schema).csv(
> "./data/")
>     var dataframe2 = dataframe.select(expr("*"), current_timestamp().as(
> "cts"))
>     dataframe2.show(false)
>     spark.stop()
>
>   }
> }
>
> Output of the above code is:
>
> +---+-----+-----------------------+
> |id |visit|cts                    |
> +---+-----+-----------------------+
> |1  |v1   |2018-01-31 15:07:00.758|
> |2  |v1   |2018-01-31 15:07:00.758|
> |2  |v2   |2018-01-31 15:07:00.758|
> |3  |v3   |2018-01-31 15:07:00.758|
> |3  |v1   |2018-01-31 15:07:00.758|
> +---+-----+-----------------------+
>
>
> Here is the code with *structured streaming* which throws the exception:
>
> import scala.collection.immutable
> import org.apache.spark.sql.functions._
> import org.joda.time._
> import org.apache.spark.sql._
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.streaming._
> import org.apache.log4j._
>
> object StreamingTest {
>   def main(args:Array[String]) : Unit = {
>     val sparkBuilder = SparkSession
>       .builder.
>       config("spark.sql.streaming.checkpointLocation", "./checkpointes").
>       appName("StreamingTest").master("local[4]")
>
>     val spark = sparkBuilder.getOrCreate()
>
>     val schema = StructType(
>         Array(
>           StructField("id", StringType, false),
>           StructField("visit", StringType, false)
>           ))
>     var dataframeInput = spark.readStream.option("sep","\t"
> ).schema(schema).csv("./data/")
>     var dataframe2 = dataframeInput.select("*")
>     dataframe2 = dataframe2.withColumn("cts", current_timestamp())
>     val query = dataframe2.writeStream.option("trucate","false").format("
> console").start
>     query.awaitTermination()
>   }
> }
>
> Here is the exception:
>
> 18/01/31 15:10:25 ERROR MicroBatchExecution: Query [id =
> 0fe655de-9096-4d69-b6a5-c593400d2eba, runId = 
> 2394a402-dd52-49b4-854e-cb46684bf4d8]
> terminated with error
> *org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call
> to dataType on unresolved object, tree: 'cts*
> at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(
> unresolved.scala:105)
> at org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(
> StructType.scala:435)
> at org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(
> StructType.scala:435)
>
> I've also used snippets (shown in bold below) from (
> https://docs.databricks.com/spark/latest/structured-
> streaming/examples.html)
> but still get the same exception:
>
> Here is the code:
>
> import scala.collection.immutable
> import org.apache.spark.sql.functions._
> import org.joda.time._
> import org.apache.spark.sql._
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.streaming._
> import org.apache.log4j._
>
> object StreamingTest {
>   def main(args:Array[String]) : Unit = {
>     val sparkBuilder = SparkSession
>       .builder.
>       config("spark.sql.streaming.checkpointLocation", "./checkpointes").
>       appName("StreamingTest").master("local[4]")
>
>     val spark = sparkBuilder.getOrCreate()
>
>     val schema = StructType(
>         Array(
>           StructField("id", StringType, false),
>           StructField("visit", StringType, false)
>           ))
>     var dataframeInput = spark.readStream.option("sep","\t"
> ).schema(schema).csv("./data/")
>     var dataframe2 = dataframeInput*.select(*
> *      current_timestamp().cast("timestamp").alias("timestamp"),*
>       expr("*"))
>     val query = dataframe2.writeStream.option("trucate","false").format("
> console").start
>     query.awaitTermination()
>   }
> }
>
>
> And the exception:
>
> 18/01/31 15:31:00 ERROR MicroBatchExecution: Query [id =
> 26b2afd9-797e-49ce-b026-0bd5321536e1, runId = 
> d8ac5386-9d59-4897-b05b-2750b29c05ca]
> terminated with error
> org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call
> to dataType on unresolved object, tree: 'timestamp
>
>
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

Reply via email to