Could you give the full stack trace of the exception? Also, can you do `dataframe2.explain(true)` and show us the plan output?
On Wed, Jan 31, 2018 at 3:35 PM, M Singh <mans2si...@yahoo.com.invalid> wrote: > Hi Folks: > > I have to add a column to a structured *streaming* dataframe but when I > do that (using select or withColumn) I get an exception. I can add a > column in structured *non-streaming* structured dataframe. I could not > find any documentation on how to do this in the following doc [ > https://spark.apache.org/docs/latest/ > *structured-streaming-programming-guide*.html] > > I am using spark 2.4.0-SNAPSHOT > > Please let me know what I could be missing. > > Thanks for your help. > > (I am also attaching the source code for the structured streaming, > structured non-streaming classes and input file with this email) > > <exception> > org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call > to dataType on unresolved object, tree: 'cts > at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType( > unresolved.scala:105) > at org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply( > StructType.scala:435) > </exception> > > Here is the input file (in the ./data directory) - note tokens are > separated by '\t' > > 1 v1 > 2 v1 > 2 v2 > 3 v3 > 3 v1 > > Here is the code with dataframe (*non-streaming*) which works: > > import scala.collection.immutable > import org.apache.spark.sql.functions._ > import org.apache.spark.sql._ > import org.apache.spark.sql.types._ > > object StructuredTest { > def main(args:Array[String]) : Unit = { > val sparkBuilder = SparkSession > .builder. > appName("StreamingTest").master("local[4]") > > val spark = sparkBuilder.getOrCreate() > > val schema = StructType( > Array( > StructField("id", StringType, false), > StructField("visit", StringType, false) > )) > var dataframe = spark.read.option("sep","\t").schema(schema).csv( > "./data/") > var dataframe2 = dataframe.select(expr("*"), current_timestamp().as( > "cts")) > dataframe2.show(false) > spark.stop() > > } > } > > Output of the above code is: > > +---+-----+-----------------------+ > |id |visit|cts | > +---+-----+-----------------------+ > |1 |v1 |2018-01-31 15:07:00.758| > |2 |v1 |2018-01-31 15:07:00.758| > |2 |v2 |2018-01-31 15:07:00.758| > |3 |v3 |2018-01-31 15:07:00.758| > |3 |v1 |2018-01-31 15:07:00.758| > +---+-----+-----------------------+ > > > Here is the code with *structured streaming* which throws the exception: > > import scala.collection.immutable > import org.apache.spark.sql.functions._ > import org.joda.time._ > import org.apache.spark.sql._ > import org.apache.spark.sql.types._ > import org.apache.spark.sql.streaming._ > import org.apache.log4j._ > > object StreamingTest { > def main(args:Array[String]) : Unit = { > val sparkBuilder = SparkSession > .builder. > config("spark.sql.streaming.checkpointLocation", "./checkpointes"). > appName("StreamingTest").master("local[4]") > > val spark = sparkBuilder.getOrCreate() > > val schema = StructType( > Array( > StructField("id", StringType, false), > StructField("visit", StringType, false) > )) > var dataframeInput = spark.readStream.option("sep","\t" > ).schema(schema).csv("./data/") > var dataframe2 = dataframeInput.select("*") > dataframe2 = dataframe2.withColumn("cts", current_timestamp()) > val query = dataframe2.writeStream.option("trucate","false").format(" > console").start > query.awaitTermination() > } > } > > Here is the exception: > > 18/01/31 15:10:25 ERROR MicroBatchExecution: Query [id = > 0fe655de-9096-4d69-b6a5-c593400d2eba, runId = > 2394a402-dd52-49b4-854e-cb46684bf4d8] > terminated with error > *org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call > to dataType on unresolved object, tree: 'cts* > at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType( > unresolved.scala:105) > at org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply( > StructType.scala:435) > at org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply( > StructType.scala:435) > > I've also used snippets (shown in bold below) from ( > https://docs.databricks.com/spark/latest/structured- > streaming/examples.html) > but still get the same exception: > > Here is the code: > > import scala.collection.immutable > import org.apache.spark.sql.functions._ > import org.joda.time._ > import org.apache.spark.sql._ > import org.apache.spark.sql.types._ > import org.apache.spark.sql.streaming._ > import org.apache.log4j._ > > object StreamingTest { > def main(args:Array[String]) : Unit = { > val sparkBuilder = SparkSession > .builder. > config("spark.sql.streaming.checkpointLocation", "./checkpointes"). > appName("StreamingTest").master("local[4]") > > val spark = sparkBuilder.getOrCreate() > > val schema = StructType( > Array( > StructField("id", StringType, false), > StructField("visit", StringType, false) > )) > var dataframeInput = spark.readStream.option("sep","\t" > ).schema(schema).csv("./data/") > var dataframe2 = dataframeInput*.select(* > * current_timestamp().cast("timestamp").alias("timestamp"),* > expr("*")) > val query = dataframe2.writeStream.option("trucate","false").format(" > console").start > query.awaitTermination() > } > } > > > And the exception: > > 18/01/31 15:31:00 ERROR MicroBatchExecution: Query [id = > 26b2afd9-797e-49ce-b026-0bd5321536e1, runId = > d8ac5386-9d59-4897-b05b-2750b29c05ca] > terminated with error > org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call > to dataType on unresolved object, tree: 'timestamp > > > > --------------------------------------------------------------------- > To unsubscribe e-mail: user-unsubscr...@spark.apache.org >