[jira] [Commented] (SPARK-23337) withWatermark raises an exception on struct objects
[ https://issues.apache.org/jira/browse/SPARK-23337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16580440#comment-16580440 ] Aydin Kocas commented on SPARK-23337: - [~marmbrus] Can you give a hint how to do it with "withColumn" in java? Dataset jsonRow = spark.readStream() .schema(...) .json(..).withColumn("createTime", ?? );.withWatermark("createTime", "10 minutes"); > withWatermark raises an exception on struct objects > --- > > Key: SPARK-23337 > URL: https://issues.apache.org/jira/browse/SPARK-23337 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.2.1 > Environment: Linux Ubuntu, Spark on standalone mode >Reporter: Aydin Kocas >Priority: Major > > Hi, > > when using a nested object (I mean an object within a struct, here concrete: > _source.createTime) from a json file as the parameter for the > withWatermark-method, I get an exception (see below). > Anything else works flawlessly with the nested object. > > +*{color:#14892c}works:{color}*+ > {code:java} > Dataset jsonRow = > spark.readStream().schema(getJSONSchema()).json(file).dropDuplicates("_id").withWatermark("myTime", > "10 seconds").toDF();{code} > > json structure: > {code:java} > root > |-- _id: string (nullable = true) > |-- _index: string (nullable = true) > |-- _score: long (nullable = true) > |-- myTime: timestamp (nullable = true) > ..{code} > +*{color:#d04437}does not work - nested json{color}:*+ > {code:java} > Dataset jsonRow = > spark.readStream().schema(getJSONSchema()).json(file).dropDuplicates("_id").withWatermark("_source.createTime", > "10 seconds").toDF();{code} > > json structure: > > {code:java} > root > |-- _id: string (nullable = true) > |-- _index: string (nullable = true) > |-- _score: long (nullable = true) > |-- _source: struct (nullable = true) > | |-- createTime: timestamp (nullable = true) > .. > > Exception in thread "main" > org.apache.spark.sql.catalyst.errors.package$TreeNodeException: makeCopy, > tree: > 'EventTimeWatermark '_source.createTime, interval 10 seconds > +- Deduplicate [_id#0], true > +- StreamingRelation > DataSource(org.apache.spark.sql.SparkSession@5dbbb292,json,List(),Some(StructType(StructField(_id,StringType,true), > StructField(_index,StringType,true), StructField(_score,LongType,true), > StructField(_source,StructType(StructField(additionalData,StringType,true), > StructField(client,StringType,true), > StructField(clientDomain,BooleanType,true), > StructField(clientVersion,StringType,true), > StructField(country,StringType,true), > StructField(countryName,StringType,true), > StructField(createTime,TimestampType,true), > StructField(externalIP,StringType,true), > StructField(hostname,StringType,true), > StructField(internalIP,StringType,true), > StructField(location,StringType,true), > StructField(locationDestination,StringType,true), > StructField(login,StringType,true), > StructField(originalRequestString,StringType,true), > StructField(password,StringType,true), > StructField(peerIdent,StringType,true), > StructField(peerType,StringType,true), > StructField(recievedTime,TimestampType,true), > StructField(sessionEnd,StringType,true), > StructField(sessionStart,StringType,true), > StructField(sourceEntryAS,StringType,true), > StructField(sourceEntryIp,StringType,true), > StructField(sourceEntryPort,StringType,true), > StructField(targetCountry,StringType,true), > StructField(targetCountryName,StringType,true), > StructField(targetEntryAS,StringType,true), > StructField(targetEntryIp,StringType,true), > StructField(targetEntryPort,StringType,true), > StructField(targetport,StringType,true), > StructField(username,StringType,true), > StructField(vulnid,StringType,true)),true), > StructField(_type,StringType,true))),List(),None,Map(path -> ./input/),None), > FileSource[./input/], [_id#0, _index#1, _score#2L, _source#3, _type#4] > at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56) > at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:385) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:300) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:268) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:854) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:796) > at > org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:62) > at > org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:62) >
[jira] [Commented] (SPARK-20894) Error while checkpointing to HDFS
[ https://issues.apache.org/jira/browse/SPARK-20894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16452041#comment-16452041 ] Aydin Kocas commented on SPARK-20894: - removing the checkpoint location along with the _spark_metadata folder in the affected writeStream output folder helped to get rid of the issue, but it should be notices that the situation persists in spark 2.3. Seems that there is some bad state in _spark_metadata - it happened unexpected without any code change - therefore me it looks like a bug somewhere. I am not having any hdfs, am developing locally without a cluster, > Error while checkpointing to HDFS > - > > Key: SPARK-20894 > URL: https://issues.apache.org/jira/browse/SPARK-20894 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.1.1 > Environment: Ubuntu, Spark 2.1.1, hadoop 2.7 >Reporter: kant kodali >Assignee: Shixiong Zhu >Priority: Major > Fix For: 2.3.0 > > Attachments: driver_info_log, executor1_log, executor2_log > > > Dataset df2 = df1.groupBy(functions.window(df1.col("Timestamp5"), "24 > hours", "24 hours"), df1.col("AppName")).count(); > StreamingQuery query = df2.writeStream().foreach(new > KafkaSink()).option("checkpointLocation","/usr/local/hadoop/checkpoint").outputMode("update").start(); > query.awaitTermination(); > This for some reason fails with the Error > ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1) > java.lang.IllegalStateException: Error reading delta file > /usr/local/hadoop/checkpoint/state/0/0/1.delta of HDFSStateStoreProvider[id = > (op=0, part=0), dir = /usr/local/hadoop/checkpoint/state/0/0]: > /usr/local/hadoop/checkpoint/state/0/0/1.delta does not exist > I did clear all the checkpoint data in /usr/local/hadoop/checkpoint/ and all > consumer offsets in Kafka from all brokers prior to running and yet this > error still persists. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20894) Error while checkpointing to HDFS
[ https://issues.apache.org/jira/browse/SPARK-20894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16452030#comment-16452030 ] Aydin Kocas commented on SPARK-20894: - having the same issue on 2.3 - what's the solution? Am developing locally on standard ext4-fs, not in a hdfs-cluster but having same hdfs-related entries in the log along with the error message > Error while checkpointing to HDFS > - > > Key: SPARK-20894 > URL: https://issues.apache.org/jira/browse/SPARK-20894 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.1.1 > Environment: Ubuntu, Spark 2.1.1, hadoop 2.7 >Reporter: kant kodali >Assignee: Shixiong Zhu >Priority: Major > Fix For: 2.3.0 > > Attachments: driver_info_log, executor1_log, executor2_log > > > Dataset df2 = df1.groupBy(functions.window(df1.col("Timestamp5"), "24 > hours", "24 hours"), df1.col("AppName")).count(); > StreamingQuery query = df2.writeStream().foreach(new > KafkaSink()).option("checkpointLocation","/usr/local/hadoop/checkpoint").outputMode("update").start(); > query.awaitTermination(); > This for some reason fails with the Error > ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1) > java.lang.IllegalStateException: Error reading delta file > /usr/local/hadoop/checkpoint/state/0/0/1.delta of HDFSStateStoreProvider[id = > (op=0, part=0), dir = /usr/local/hadoop/checkpoint/state/0/0]: > /usr/local/hadoop/checkpoint/state/0/0/1.delta does not exist > I did clear all the checkpoint data in /usr/local/hadoop/checkpoint/ and all > consumer offsets in Kafka from all brokers prior to running and yet this > error still persists. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23337) withWatermark raises an exception on struct objects
[ https://issues.apache.org/jira/browse/SPARK-23337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16432200#comment-16432200 ] Aydin Kocas commented on SPARK-23337: - Hi Michael, in my case it's a blocking issue and unfortunately not just annoying because I can't use the watermarking-functionality when doing the readstream on a json file . Without the time limitation via the watermarking-functionality, I have concerns that my checkpoint-dir will increase with time because of not having any time boundaries. > withWatermark raises an exception on struct objects > --- > > Key: SPARK-23337 > URL: https://issues.apache.org/jira/browse/SPARK-23337 > Project: Spark > Issue Type: Bug > Components: Structured Streaming >Affects Versions: 2.2.1 > Environment: Linux Ubuntu, Spark on standalone mode >Reporter: Aydin Kocas >Priority: Major > > Hi, > > when using a nested object (I mean an object within a struct, here concrete: > _source.createTime) from a json file as the parameter for the > withWatermark-method, I get an exception (see below). > Anything else works flawlessly with the nested object. > > +*{color:#14892c}works:{color}*+ > {code:java} > Dataset jsonRow = > spark.readStream().schema(getJSONSchema()).json(file).dropDuplicates("_id").withWatermark("myTime", > "10 seconds").toDF();{code} > > json structure: > {code:java} > root > |-- _id: string (nullable = true) > |-- _index: string (nullable = true) > |-- _score: long (nullable = true) > |-- myTime: timestamp (nullable = true) > ..{code} > +*{color:#d04437}does not work - nested json{color}:*+ > {code:java} > Dataset jsonRow = > spark.readStream().schema(getJSONSchema()).json(file).dropDuplicates("_id").withWatermark("_source.createTime", > "10 seconds").toDF();{code} > > json structure: > > {code:java} > root > |-- _id: string (nullable = true) > |-- _index: string (nullable = true) > |-- _score: long (nullable = true) > |-- _source: struct (nullable = true) > | |-- createTime: timestamp (nullable = true) > .. > > Exception in thread "main" > org.apache.spark.sql.catalyst.errors.package$TreeNodeException: makeCopy, > tree: > 'EventTimeWatermark '_source.createTime, interval 10 seconds > +- Deduplicate [_id#0], true > +- StreamingRelation > DataSource(org.apache.spark.sql.SparkSession@5dbbb292,json,List(),Some(StructType(StructField(_id,StringType,true), > StructField(_index,StringType,true), StructField(_score,LongType,true), > StructField(_source,StructType(StructField(additionalData,StringType,true), > StructField(client,StringType,true), > StructField(clientDomain,BooleanType,true), > StructField(clientVersion,StringType,true), > StructField(country,StringType,true), > StructField(countryName,StringType,true), > StructField(createTime,TimestampType,true), > StructField(externalIP,StringType,true), > StructField(hostname,StringType,true), > StructField(internalIP,StringType,true), > StructField(location,StringType,true), > StructField(locationDestination,StringType,true), > StructField(login,StringType,true), > StructField(originalRequestString,StringType,true), > StructField(password,StringType,true), > StructField(peerIdent,StringType,true), > StructField(peerType,StringType,true), > StructField(recievedTime,TimestampType,true), > StructField(sessionEnd,StringType,true), > StructField(sessionStart,StringType,true), > StructField(sourceEntryAS,StringType,true), > StructField(sourceEntryIp,StringType,true), > StructField(sourceEntryPort,StringType,true), > StructField(targetCountry,StringType,true), > StructField(targetCountryName,StringType,true), > StructField(targetEntryAS,StringType,true), > StructField(targetEntryIp,StringType,true), > StructField(targetEntryPort,StringType,true), > StructField(targetport,StringType,true), > StructField(username,StringType,true), > StructField(vulnid,StringType,true)),true), > StructField(_type,StringType,true))),List(),None,Map(path -> ./input/),None), > FileSource[./input/], [_id#0, _index#1, _score#2L, _source#3, _type#4] > at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56) > at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:385) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:300) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:268) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:854) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:796) > at > org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalP
[jira] [Updated] (SPARK-23337) withWatermark raises an exception on struct objects
[ https://issues.apache.org/jira/browse/SPARK-23337?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aydin Kocas updated SPARK-23337: Description: Hi, when using a nested object (I mean an object within a struct, here concrete: _source.createTime) from a json file as the parameter for the withWatermark-method, I get an exception (see below). Anything else works flawlessly with the nested object. +*{color:#14892c}works:{color}*+ {code:java} Dataset jsonRow = spark.readStream().schema(getJSONSchema()).json(file).dropDuplicates("_id").withWatermark("myTime", "10 seconds").toDF();{code} json structure: {code:java} root |-- _id: string (nullable = true) |-- _index: string (nullable = true) |-- _score: long (nullable = true) |-- myTime: timestamp (nullable = true) ..{code} +*{color:#d04437}does not work - nested json{color}:*+ {code:java} Dataset jsonRow = spark.readStream().schema(getJSONSchema()).json(file).dropDuplicates("_id").withWatermark("_source.createTime", "10 seconds").toDF();{code} json structure: {code:java} root |-- _id: string (nullable = true) |-- _index: string (nullable = true) |-- _score: long (nullable = true) |-- _source: struct (nullable = true) | |-- createTime: timestamp (nullable = true) .. Exception in thread "main" org.apache.spark.sql.catalyst.errors.package$TreeNodeException: makeCopy, tree: 'EventTimeWatermark '_source.createTime, interval 10 seconds +- Deduplicate [_id#0], true +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@5dbbb292,json,List(),Some(StructType(StructField(_id,StringType,true), StructField(_index,StringType,true), StructField(_score,LongType,true), StructField(_source,StructType(StructField(additionalData,StringType,true), StructField(client,StringType,true), StructField(clientDomain,BooleanType,true), StructField(clientVersion,StringType,true), StructField(country,StringType,true), StructField(countryName,StringType,true), StructField(createTime,TimestampType,true), StructField(externalIP,StringType,true), StructField(hostname,StringType,true), StructField(internalIP,StringType,true), StructField(location,StringType,true), StructField(locationDestination,StringType,true), StructField(login,StringType,true), StructField(originalRequestString,StringType,true), StructField(password,StringType,true), StructField(peerIdent,StringType,true), StructField(peerType,StringType,true), StructField(recievedTime,TimestampType,true), StructField(sessionEnd,StringType,true), StructField(sessionStart,StringType,true), StructField(sourceEntryAS,StringType,true), StructField(sourceEntryIp,StringType,true), StructField(sourceEntryPort,StringType,true), StructField(targetCountry,StringType,true), StructField(targetCountryName,StringType,true), StructField(targetEntryAS,StringType,true), StructField(targetEntryIp,StringType,true), StructField(targetEntryPort,StringType,true), StructField(targetport,StringType,true), StructField(username,StringType,true), StructField(vulnid,StringType,true)),true), StructField(_type,StringType,true))),List(),None,Map(path -> ./input/),None), FileSource[./input/], [_id#0, _index#1, _score#2L, _source#3, _type#4] at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56) at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:385) at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:300) at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:268) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:854) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:796) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:62) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:62) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:61) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:796) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:674) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82) at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124) at scala.collection.immutable.List.foldLeft(List.scala:84) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82) at org.apache.spark.sql.catalyst.rules.RuleE
[jira] [Updated] (SPARK-23337) withWatermark raises an exception on nested objects
[ https://issues.apache.org/jira/browse/SPARK-23337?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aydin Kocas updated SPARK-23337: Description: Hi, when using a nested object (here: _source.creationTime) from a json file as the parameter for the withWatermark-method, I get an exception (see below). Anything else works flawlessly with the nested object. +*{color:#14892c}works:{color}*+ {code:java} Dataset jsonRow = spark.readStream().schema(getJSONSchema()).json(file).dropDuplicates("_id").withWatermark("myTime", "10 seconds").toDF();{code} json structure: {code:java} root |-- _id: string (nullable = true) |-- _index: string (nullable = true) |-- _score: long (nullable = true) |-- myTime: timestamp (nullable = true) ..{code} +*{color:#d04437}does not work - nested json{color}:*+ {code:java} Dataset jsonRow = spark.readStream().schema(getJSONSchema()).json(file).dropDuplicates("_id").withWatermark("_source.createTime", "10 seconds").toDF();{code} json structure: {code:java} root |-- _id: string (nullable = true) |-- _index: string (nullable = true) |-- _score: long (nullable = true) |-- _source: struct (nullable = true) | |-- createTime: timestamp (nullable = true) .. Exception in thread "main" org.apache.spark.sql.catalyst.errors.package$TreeNodeException: makeCopy, tree: 'EventTimeWatermark '_source.createTime, interval 10 seconds +- Deduplicate [_id#0], true +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@5dbbb292,json,List(),Some(StructType(StructField(_id,StringType,true), StructField(_index,StringType,true), StructField(_score,LongType,true), StructField(_source,StructType(StructField(additionalData,StringType,true), StructField(client,StringType,true), StructField(clientDomain,BooleanType,true), StructField(clientVersion,StringType,true), StructField(country,StringType,true), StructField(countryName,StringType,true), StructField(createTime,TimestampType,true), StructField(externalIP,StringType,true), StructField(hostname,StringType,true), StructField(internalIP,StringType,true), StructField(location,StringType,true), StructField(locationDestination,StringType,true), StructField(login,StringType,true), StructField(originalRequestString,StringType,true), StructField(password,StringType,true), StructField(peerIdent,StringType,true), StructField(peerType,StringType,true), StructField(recievedTime,TimestampType,true), StructField(sessionEnd,StringType,true), StructField(sessionStart,StringType,true), StructField(sourceEntryAS,StringType,true), StructField(sourceEntryIp,StringType,true), StructField(sourceEntryPort,StringType,true), StructField(targetCountry,StringType,true), StructField(targetCountryName,StringType,true), StructField(targetEntryAS,StringType,true), StructField(targetEntryIp,StringType,true), StructField(targetEntryPort,StringType,true), StructField(targetport,StringType,true), StructField(username,StringType,true), StructField(vulnid,StringType,true)),true), StructField(_type,StringType,true))),List(),None,Map(path -> ./input/),None), FileSource[./input/], [_id#0, _index#1, _score#2L, _source#3, _type#4] at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56) at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:385) at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:300) at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:268) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:854) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:796) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:62) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:62) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:61) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:796) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:674) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82) at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124) at scala.collection.immutable.List.foldLeft(List.scala:84) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecu
[jira] [Created] (SPARK-23337) withWatermark raises an exception on nested objects
Aydin Kocas created SPARK-23337: --- Summary: withWatermark raises an exception on nested objects Key: SPARK-23337 URL: https://issues.apache.org/jira/browse/SPARK-23337 Project: Spark Issue Type: Bug Components: Structured Streaming Affects Versions: 2.2.1 Environment: Linux Ubuntu, Spark on standalone mode Reporter: Aydin Kocas Hi, when using a nested object (here: _source.creationTime) from a json file as the parameter for the withWatermark-method, I get an exception (see below). Anything else works flawlessly with the nested object. +*{color:#14892c}works:{color}*+ {code:java} Dataset jsonRow = spark.readStream().schema(getJSONSchema()).json(file).dropDuplicates("_id").withWatermark("myTime", "10 seconds").toDF();{code} json structure: {code:java} root |-- _id: string (nullable = true) |-- _index: string (nullable = true) |-- _score: long (nullable = true) |-- myTime: timestamp (nullable = true) ..{code} +*{color:#d04437}does not work - nested json{color}:*+ {code:java} Dataset jsonRow = spark.readStream().schema(getJSONSchema()).json(file).dropDuplicates("_id").withWatermark("_source.creationTime", "10 seconds").toDF();{code} json structure: {code:java} root |-- _id: string (nullable = true) |-- _index: string (nullable = true) |-- _score: long (nullable = true) |-- _source: struct (nullable = true) | |-- creationTime: timestamp (nullable = true) .. Exception in thread "main" org.apache.spark.sql.catalyst.errors.package$TreeNodeException: makeCopy, tree: 'EventTimeWatermark '_source.createTime, interval 10 seconds +- Deduplicate [_id#0], true +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@5dbbb292,json,List(),Some(StructType(StructField(_id,StringType,true), StructField(_index,StringType,true), StructField(_score,LongType,true), StructField(_source,StructType(StructField(additionalData,StringType,true), StructField(client,StringType,true), StructField(clientDomain,BooleanType,true), StructField(clientVersion,StringType,true), StructField(country,StringType,true), StructField(countryName,StringType,true), StructField(createTime,TimestampType,true), StructField(externalIP,StringType,true), StructField(hostname,StringType,true), StructField(internalIP,StringType,true), StructField(location,StringType,true), StructField(locationDestination,StringType,true), StructField(login,StringType,true), StructField(originalRequestString,StringType,true), StructField(password,StringType,true), StructField(peerIdent,StringType,true), StructField(peerType,StringType,true), StructField(recievedTime,TimestampType,true), StructField(sessionEnd,StringType,true), StructField(sessionStart,StringType,true), StructField(sourceEntryAS,StringType,true), StructField(sourceEntryIp,StringType,true), StructField(sourceEntryPort,StringType,true), StructField(targetCountry,StringType,true), StructField(targetCountryName,StringType,true), StructField(targetEntryAS,StringType,true), StructField(targetEntryIp,StringType,true), StructField(targetEntryPort,StringType,true), StructField(targetport,StringType,true), StructField(username,StringType,true), StructField(vulnid,StringType,true)),true), StructField(_type,StringType,true))),List(),None,Map(path -> ./input/),None), FileSource[./input/], [_id#0, _index#1, _score#2L, _source#3, _type#4] at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56) at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:385) at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:300) at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:268) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:854) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:796) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:62) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:62) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:61) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:796) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:674) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82) at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimi