[ https://issues.apache.org/jira/browse/SPARK-23337?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Aydin Kocas updated SPARK-23337: -------------------------------- Description: Hi, when using a nested object (I mean an object within a struct, here concrete: _source.createTime) from a json file as the parameter for the withWatermark-method, I get an exception (see below). Anything else works flawlessly with the nested object. +*{color:#14892c}works:{color}*+ {code:java} Dataset<Row> jsonRow = spark.readStream().schema(getJSONSchema()).json(file).dropDuplicates("_id").withWatermark("myTime", "10 seconds").toDF();{code} json structure: {code:java} root |-- _id: string (nullable = true) |-- _index: string (nullable = true) |-- _score: long (nullable = true) |-- myTime: timestamp (nullable = true) ..{code} +*{color:#d04437}does not work - nested json{color}:*+ {code:java} Dataset<Row> jsonRow = spark.readStream().schema(getJSONSchema()).json(file).dropDuplicates("_id").withWatermark("_source.createTime", "10 seconds").toDF();{code} json structure: {code:java} root |-- _id: string (nullable = true) |-- _index: string (nullable = true) |-- _score: long (nullable = true) |-- _source: struct (nullable = true) | |-- createTime: timestamp (nullable = true) .. Exception in thread "main" org.apache.spark.sql.catalyst.errors.package$TreeNodeException: makeCopy, tree: 'EventTimeWatermark '_source.createTime, interval 10 seconds +- Deduplicate [_id#0], true +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@5dbbb292,json,List(),Some(StructType(StructField(_id,StringType,true), StructField(_index,StringType,true), StructField(_score,LongType,true), StructField(_source,StructType(StructField(additionalData,StringType,true), StructField(client,StringType,true), StructField(clientDomain,BooleanType,true), StructField(clientVersion,StringType,true), StructField(country,StringType,true), StructField(countryName,StringType,true), StructField(createTime,TimestampType,true), StructField(externalIP,StringType,true), StructField(hostname,StringType,true), StructField(internalIP,StringType,true), StructField(location,StringType,true), StructField(locationDestination,StringType,true), StructField(login,StringType,true), StructField(originalRequestString,StringType,true), StructField(password,StringType,true), StructField(peerIdent,StringType,true), StructField(peerType,StringType,true), StructField(recievedTime,TimestampType,true), StructField(sessionEnd,StringType,true), StructField(sessionStart,StringType,true), StructField(sourceEntryAS,StringType,true), StructField(sourceEntryIp,StringType,true), StructField(sourceEntryPort,StringType,true), StructField(targetCountry,StringType,true), StructField(targetCountryName,StringType,true), StructField(targetEntryAS,StringType,true), StructField(targetEntryIp,StringType,true), StructField(targetEntryPort,StringType,true), StructField(targetport,StringType,true), StructField(username,StringType,true), StructField(vulnid,StringType,true)),true), StructField(_type,StringType,true))),List(),None,Map(path -> ./input/),None), FileSource[./input/], [_id#0, _index#1, _score#2L, _source#3, _type#4] at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56) at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:385) at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:300) at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:268) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:854) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:796) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:62) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:62) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:61) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:796) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:674) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82) at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124) at scala.collection.immutable.List.foldLeft(List.scala:84) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74) at scala.collection.immutable.List.foreach(List.scala:381) at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74) at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:69) at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:67) at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:50) at org.apache.spark.sql.Dataset.<init>(Dataset.scala:165) at org.apache.spark.sql.Dataset.<init>(Dataset.scala:171) at org.apache.spark.sql.Dataset$.apply(Dataset.scala:62) at org.apache.spark.sql.Dataset.withTypedPlan(Dataset.scala:2889) at org.apache.spark.sql.Dataset.withWatermark(Dataset.scala:569) at my.package.hpstatistics.Importer.importViaSparkStreaming(Importer.java:278) at my.package.hpstatistics.Main.main(Main.java:80) Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Failed to copy node. Is otherCopyArgs specified correctly for EventTimeWatermark. Exception message: argument type mismatch ctor: public org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark(org.apache.spark.sql.catalyst.expressions.Attribute,org.apache.spark.unsafe.types.CalendarInterval,org.apache.spark.sql.catalyst.plans.logical.LogicalPlan)? types: class org.apache.spark.sql.catalyst.expressions.Alias, class org.apache.spark.unsafe.types.CalendarInterval, class org.apache.spark.sql.catalyst.plans.logical.Deduplicate args: _source#3.createTime AS createTime#12, interval 10 seconds, Deduplicate [_id#0], true +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@5dbbb292,json,List(),Some(StructType(StructField(_id,StringType,true), StructField(_index,StringType,true), StructField(_score,LongType,true), StructField(_source,StructType(StructField(additionalData,StringType,true), StructField(client,StringType,true), StructField(clientDomain,BooleanType,true), StructField(clientVersion,StringType,true), StructField(country,StringType,true), StructField(countryName,StringType,true), StructField(createTime,TimestampType,true), StructField(externalIP,StringType,true), StructField(hostname,StringType,true), StructField(internalIP,StringType,true), StructField(location,StringType,true), StructField(locationDestination,StringType,true), StructField(login,StringType,true), StructField(originalRequestString,StringType,true), StructField(password,StringType,true), StructField(peerIdent,StringType,true), StructField(peerType,StringType,true), StructField(recievedTime,TimestampType,true), StructField(sessionEnd,StringType,true), StructField(sessionStart,StringType,true), StructField(sourceEntryAS,StringType,true), StructField(sourceEntryIp,StringType,true), StructField(sourceEntryPort,StringType,true), StructField(targetCountry,StringType,true), StructField(targetCountryName,StringType,true), StructField(targetEntryAS,StringType,true), StructField(targetEntryIp,StringType,true), StructField(targetEntryPort,StringType,true), StructField(targetport,StringType,true), StructField(username,StringType,true), StructField(vulnid,StringType,true)),true), StructField(_type,StringType,true))),List(),None,Map(path -> ./input/),None), FileSource[./input/], [_id#0, _index#1, _score#2L, _source#3, _type#4] , tree: 'EventTimeWatermark '_source.createTime, interval 10 seconds +- Deduplicate [_id#0], true +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@5dbbb292,json,List(),Some(StructType(StructField(_id,StringType,true), StructField(_index,StringType,true), StructField(_score,LongType,true), StructField(_source,StructType(StructField(additionalData,StringType,true), StructField(client,StringType,true), StructField(clientDomain,BooleanType,true), StructField(clientVersion,StringType,true), StructField(country,StringType,true), StructField(countryName,StringType,true), StructField(createTime,TimestampType,true), StructField(externalIP,StringType,true), StructField(hostname,StringType,true), StructField(internalIP,StringType,true), StructField(location,StringType,true), StructField(locationDestination,StringType,true), StructField(login,StringType,true), StructField(originalRequestString,StringType,true), StructField(password,StringType,true), StructField(peerIdent,StringType,true), StructField(peerType,StringType,true), StructField(recievedTime,TimestampType,true), StructField(sessionEnd,StringType,true), StructField(sessionStart,StringType,true), StructField(sourceEntryAS,StringType,true), StructField(sourceEntryIp,StringType,true), StructField(sourceEntryPort,StringType,true), StructField(targetCountry,StringType,true), StructField(targetCountryName,StringType,true), StructField(targetEntryAS,StringType,true), StructField(targetEntryIp,StringType,true), StructField(targetEntryPort,StringType,true), StructField(targetport,StringType,true), StructField(username,StringType,true), StructField(vulnid,StringType,true)),true), StructField(_type,StringType,true))),List(),None,Map(path -> ./input/),None), FileSource[./input/], [_id#0, _index#1, _score#2L, _source#3, _type#4] at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$makeCopy$1.apply(TreeNode.scala:415) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$makeCopy$1.apply(TreeNode.scala:385) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52) ... 29 more {code} was: Hi, when using a nested object (here: _source.creationTime) from a json file as the parameter for the withWatermark-method, I get an exception (see below). Anything else works flawlessly with the nested object. +*{color:#14892c}works:{color}*+ {code:java} Dataset<Row> jsonRow = spark.readStream().schema(getJSONSchema()).json(file).dropDuplicates("_id").withWatermark("myTime", "10 seconds").toDF();{code} json structure: {code:java} root |-- _id: string (nullable = true) |-- _index: string (nullable = true) |-- _score: long (nullable = true) |-- myTime: timestamp (nullable = true) ..{code} +*{color:#d04437}does not work - nested json{color}:*+ {code:java} Dataset<Row> jsonRow = spark.readStream().schema(getJSONSchema()).json(file).dropDuplicates("_id").withWatermark("_source.createTime", "10 seconds").toDF();{code} json structure: {code:java} root |-- _id: string (nullable = true) |-- _index: string (nullable = true) |-- _score: long (nullable = true) |-- _source: struct (nullable = true) | |-- createTime: timestamp (nullable = true) .. Exception in thread "main" org.apache.spark.sql.catalyst.errors.package$TreeNodeException: makeCopy, tree: 'EventTimeWatermark '_source.createTime, interval 10 seconds +- Deduplicate [_id#0], true +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@5dbbb292,json,List(),Some(StructType(StructField(_id,StringType,true), StructField(_index,StringType,true), StructField(_score,LongType,true), StructField(_source,StructType(StructField(additionalData,StringType,true), StructField(client,StringType,true), StructField(clientDomain,BooleanType,true), StructField(clientVersion,StringType,true), StructField(country,StringType,true), StructField(countryName,StringType,true), StructField(createTime,TimestampType,true), StructField(externalIP,StringType,true), StructField(hostname,StringType,true), StructField(internalIP,StringType,true), StructField(location,StringType,true), StructField(locationDestination,StringType,true), StructField(login,StringType,true), StructField(originalRequestString,StringType,true), StructField(password,StringType,true), StructField(peerIdent,StringType,true), StructField(peerType,StringType,true), StructField(recievedTime,TimestampType,true), StructField(sessionEnd,StringType,true), StructField(sessionStart,StringType,true), StructField(sourceEntryAS,StringType,true), StructField(sourceEntryIp,StringType,true), StructField(sourceEntryPort,StringType,true), StructField(targetCountry,StringType,true), StructField(targetCountryName,StringType,true), StructField(targetEntryAS,StringType,true), StructField(targetEntryIp,StringType,true), StructField(targetEntryPort,StringType,true), StructField(targetport,StringType,true), StructField(username,StringType,true), StructField(vulnid,StringType,true)),true), StructField(_type,StringType,true))),List(),None,Map(path -> ./input/),None), FileSource[./input/], [_id#0, _index#1, _score#2L, _source#3, _type#4] at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56) at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:385) at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:300) at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:268) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:854) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:796) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:62) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:62) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:61) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:796) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:674) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82) at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124) at scala.collection.immutable.List.foldLeft(List.scala:84) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74) at scala.collection.immutable.List.foreach(List.scala:381) at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74) at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:69) at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:67) at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:50) at org.apache.spark.sql.Dataset.<init>(Dataset.scala:165) at org.apache.spark.sql.Dataset.<init>(Dataset.scala:171) at org.apache.spark.sql.Dataset$.apply(Dataset.scala:62) at org.apache.spark.sql.Dataset.withTypedPlan(Dataset.scala:2889) at org.apache.spark.sql.Dataset.withWatermark(Dataset.scala:569) at my.package.hpstatistics.Importer.importViaSparkStreaming(Importer.java:278) at my.package.hpstatistics.Main.main(Main.java:80) Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Failed to copy node. Is otherCopyArgs specified correctly for EventTimeWatermark. Exception message: argument type mismatch ctor: public org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark(org.apache.spark.sql.catalyst.expressions.Attribute,org.apache.spark.unsafe.types.CalendarInterval,org.apache.spark.sql.catalyst.plans.logical.LogicalPlan)? types: class org.apache.spark.sql.catalyst.expressions.Alias, class org.apache.spark.unsafe.types.CalendarInterval, class org.apache.spark.sql.catalyst.plans.logical.Deduplicate args: _source#3.createTime AS createTime#12, interval 10 seconds, Deduplicate [_id#0], true +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@5dbbb292,json,List(),Some(StructType(StructField(_id,StringType,true), StructField(_index,StringType,true), StructField(_score,LongType,true), StructField(_source,StructType(StructField(additionalData,StringType,true), StructField(client,StringType,true), StructField(clientDomain,BooleanType,true), StructField(clientVersion,StringType,true), StructField(country,StringType,true), StructField(countryName,StringType,true), StructField(createTime,TimestampType,true), StructField(externalIP,StringType,true), StructField(hostname,StringType,true), StructField(internalIP,StringType,true), StructField(location,StringType,true), StructField(locationDestination,StringType,true), StructField(login,StringType,true), StructField(originalRequestString,StringType,true), StructField(password,StringType,true), StructField(peerIdent,StringType,true), StructField(peerType,StringType,true), StructField(recievedTime,TimestampType,true), StructField(sessionEnd,StringType,true), StructField(sessionStart,StringType,true), StructField(sourceEntryAS,StringType,true), StructField(sourceEntryIp,StringType,true), StructField(sourceEntryPort,StringType,true), StructField(targetCountry,StringType,true), StructField(targetCountryName,StringType,true), StructField(targetEntryAS,StringType,true), StructField(targetEntryIp,StringType,true), StructField(targetEntryPort,StringType,true), StructField(targetport,StringType,true), StructField(username,StringType,true), StructField(vulnid,StringType,true)),true), StructField(_type,StringType,true))),List(),None,Map(path -> ./input/),None), FileSource[./input/], [_id#0, _index#1, _score#2L, _source#3, _type#4] , tree: 'EventTimeWatermark '_source.createTime, interval 10 seconds +- Deduplicate [_id#0], true +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@5dbbb292,json,List(),Some(StructType(StructField(_id,StringType,true), StructField(_index,StringType,true), StructField(_score,LongType,true), StructField(_source,StructType(StructField(additionalData,StringType,true), StructField(client,StringType,true), StructField(clientDomain,BooleanType,true), StructField(clientVersion,StringType,true), StructField(country,StringType,true), StructField(countryName,StringType,true), StructField(createTime,TimestampType,true), StructField(externalIP,StringType,true), StructField(hostname,StringType,true), StructField(internalIP,StringType,true), StructField(location,StringType,true), StructField(locationDestination,StringType,true), StructField(login,StringType,true), StructField(originalRequestString,StringType,true), StructField(password,StringType,true), StructField(peerIdent,StringType,true), StructField(peerType,StringType,true), StructField(recievedTime,TimestampType,true), StructField(sessionEnd,StringType,true), StructField(sessionStart,StringType,true), StructField(sourceEntryAS,StringType,true), StructField(sourceEntryIp,StringType,true), StructField(sourceEntryPort,StringType,true), StructField(targetCountry,StringType,true), StructField(targetCountryName,StringType,true), StructField(targetEntryAS,StringType,true), StructField(targetEntryIp,StringType,true), StructField(targetEntryPort,StringType,true), StructField(targetport,StringType,true), StructField(username,StringType,true), StructField(vulnid,StringType,true)),true), StructField(_type,StringType,true))),List(),None,Map(path -> ./input/),None), FileSource[./input/], [_id#0, _index#1, _score#2L, _source#3, _type#4] at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$makeCopy$1.apply(TreeNode.scala:415) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$makeCopy$1.apply(TreeNode.scala:385) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52) ... 29 more {code} Summary: withWatermark raises an exception on struct objects (was: withWatermark raises an exception on nested objects) > withWatermark raises an exception on struct objects > --------------------------------------------------- > > Key: SPARK-23337 > URL: https://issues.apache.org/jira/browse/SPARK-23337 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 2.2.1 > Environment: Linux Ubuntu, Spark on standalone mode > Reporter: Aydin Kocas > Priority: Major > > Hi, > > when using a nested object (I mean an object within a struct, here concrete: > _source.createTime) from a json file as the parameter for the > withWatermark-method, I get an exception (see below). > Anything else works flawlessly with the nested object. > > +*{color:#14892c}works:{color}*+ > {code:java} > Dataset<Row> jsonRow = > spark.readStream().schema(getJSONSchema()).json(file).dropDuplicates("_id").withWatermark("myTime", > "10 seconds").toDF();{code} > > json structure: > {code:java} > root > |-- _id: string (nullable = true) > |-- _index: string (nullable = true) > |-- _score: long (nullable = true) > |-- myTime: timestamp (nullable = true) > ..{code} > +*{color:#d04437}does not work - nested json{color}:*+ > {code:java} > Dataset<Row> jsonRow = > spark.readStream().schema(getJSONSchema()).json(file).dropDuplicates("_id").withWatermark("_source.createTime", > "10 seconds").toDF();{code} > > json structure: > > {code:java} > root > |-- _id: string (nullable = true) > |-- _index: string (nullable = true) > |-- _score: long (nullable = true) > |-- _source: struct (nullable = true) > | |-- createTime: timestamp (nullable = true) > .. > > Exception in thread "main" > org.apache.spark.sql.catalyst.errors.package$TreeNodeException: makeCopy, > tree: > 'EventTimeWatermark '_source.createTime, interval 10 seconds > +- Deduplicate [_id#0], true > +- StreamingRelation > DataSource(org.apache.spark.sql.SparkSession@5dbbb292,json,List(),Some(StructType(StructField(_id,StringType,true), > StructField(_index,StringType,true), StructField(_score,LongType,true), > StructField(_source,StructType(StructField(additionalData,StringType,true), > StructField(client,StringType,true), > StructField(clientDomain,BooleanType,true), > StructField(clientVersion,StringType,true), > StructField(country,StringType,true), > StructField(countryName,StringType,true), > StructField(createTime,TimestampType,true), > StructField(externalIP,StringType,true), > StructField(hostname,StringType,true), > StructField(internalIP,StringType,true), > StructField(location,StringType,true), > StructField(locationDestination,StringType,true), > StructField(login,StringType,true), > StructField(originalRequestString,StringType,true), > StructField(password,StringType,true), > StructField(peerIdent,StringType,true), > StructField(peerType,StringType,true), > StructField(recievedTime,TimestampType,true), > StructField(sessionEnd,StringType,true), > StructField(sessionStart,StringType,true), > StructField(sourceEntryAS,StringType,true), > StructField(sourceEntryIp,StringType,true), > StructField(sourceEntryPort,StringType,true), > StructField(targetCountry,StringType,true), > StructField(targetCountryName,StringType,true), > StructField(targetEntryAS,StringType,true), > StructField(targetEntryIp,StringType,true), > StructField(targetEntryPort,StringType,true), > StructField(targetport,StringType,true), > StructField(username,StringType,true), > StructField(vulnid,StringType,true)),true), > StructField(_type,StringType,true))),List(),None,Map(path -> ./input/),None), > FileSource[./input/], [_id#0, _index#1, _score#2L, _source#3, _type#4] > at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56) > at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:385) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:300) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:268) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:854) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:796) > at > org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:62) > at > org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:62) > at > org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) > at > org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:61) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:796) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:674) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82) > at > scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124) > at scala.collection.immutable.List.foldLeft(List.scala:84) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74) > at scala.collection.immutable.List.foreach(List.scala:381) > at > org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74) > at > org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:69) > at > org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:67) > at > org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:50) > at org.apache.spark.sql.Dataset.<init>(Dataset.scala:165) > at org.apache.spark.sql.Dataset.<init>(Dataset.scala:171) > at org.apache.spark.sql.Dataset$.apply(Dataset.scala:62) > at org.apache.spark.sql.Dataset.withTypedPlan(Dataset.scala:2889) > at org.apache.spark.sql.Dataset.withWatermark(Dataset.scala:569) > at > my.package.hpstatistics.Importer.importViaSparkStreaming(Importer.java:278) > at my.package.hpstatistics.Main.main(Main.java:80) > Caused by: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: > Failed to copy node. > Is otherCopyArgs specified correctly for EventTimeWatermark. > Exception message: argument type mismatch > ctor: public > org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark(org.apache.spark.sql.catalyst.expressions.Attribute,org.apache.spark.unsafe.types.CalendarInterval,org.apache.spark.sql.catalyst.plans.logical.LogicalPlan)? > types: class org.apache.spark.sql.catalyst.expressions.Alias, class > org.apache.spark.unsafe.types.CalendarInterval, class > org.apache.spark.sql.catalyst.plans.logical.Deduplicate > args: _source#3.createTime AS createTime#12, interval 10 seconds, Deduplicate > [_id#0], true > +- StreamingRelation > DataSource(org.apache.spark.sql.SparkSession@5dbbb292,json,List(),Some(StructType(StructField(_id,StringType,true), > StructField(_index,StringType,true), StructField(_score,LongType,true), > StructField(_source,StructType(StructField(additionalData,StringType,true), > StructField(client,StringType,true), > StructField(clientDomain,BooleanType,true), > StructField(clientVersion,StringType,true), > StructField(country,StringType,true), > StructField(countryName,StringType,true), > StructField(createTime,TimestampType,true), > StructField(externalIP,StringType,true), > StructField(hostname,StringType,true), > StructField(internalIP,StringType,true), > StructField(location,StringType,true), > StructField(locationDestination,StringType,true), > StructField(login,StringType,true), > StructField(originalRequestString,StringType,true), > StructField(password,StringType,true), > StructField(peerIdent,StringType,true), > StructField(peerType,StringType,true), > StructField(recievedTime,TimestampType,true), > StructField(sessionEnd,StringType,true), > StructField(sessionStart,StringType,true), > StructField(sourceEntryAS,StringType,true), > StructField(sourceEntryIp,StringType,true), > StructField(sourceEntryPort,StringType,true), > StructField(targetCountry,StringType,true), > StructField(targetCountryName,StringType,true), > StructField(targetEntryAS,StringType,true), > StructField(targetEntryIp,StringType,true), > StructField(targetEntryPort,StringType,true), > StructField(targetport,StringType,true), > StructField(username,StringType,true), > StructField(vulnid,StringType,true)),true), > StructField(_type,StringType,true))),List(),None,Map(path -> ./input/),None), > FileSource[./input/], [_id#0, _index#1, _score#2L, _source#3, _type#4] > , tree: > 'EventTimeWatermark '_source.createTime, interval 10 seconds > +- Deduplicate [_id#0], true > +- StreamingRelation > DataSource(org.apache.spark.sql.SparkSession@5dbbb292,json,List(),Some(StructType(StructField(_id,StringType,true), > StructField(_index,StringType,true), StructField(_score,LongType,true), > StructField(_source,StructType(StructField(additionalData,StringType,true), > StructField(client,StringType,true), > StructField(clientDomain,BooleanType,true), > StructField(clientVersion,StringType,true), > StructField(country,StringType,true), > StructField(countryName,StringType,true), > StructField(createTime,TimestampType,true), > StructField(externalIP,StringType,true), > StructField(hostname,StringType,true), > StructField(internalIP,StringType,true), > StructField(location,StringType,true), > StructField(locationDestination,StringType,true), > StructField(login,StringType,true), > StructField(originalRequestString,StringType,true), > StructField(password,StringType,true), > StructField(peerIdent,StringType,true), > StructField(peerType,StringType,true), > StructField(recievedTime,TimestampType,true), > StructField(sessionEnd,StringType,true), > StructField(sessionStart,StringType,true), > StructField(sourceEntryAS,StringType,true), > StructField(sourceEntryIp,StringType,true), > StructField(sourceEntryPort,StringType,true), > StructField(targetCountry,StringType,true), > StructField(targetCountryName,StringType,true), > StructField(targetEntryAS,StringType,true), > StructField(targetEntryIp,StringType,true), > StructField(targetEntryPort,StringType,true), > StructField(targetport,StringType,true), > StructField(username,StringType,true), > StructField(vulnid,StringType,true)),true), > StructField(_type,StringType,true))),List(),None,Map(path -> ./input/),None), > FileSource[./input/], [_id#0, _index#1, _score#2L, _source#3, _type#4] > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$makeCopy$1.apply(TreeNode.scala:415) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$makeCopy$1.apply(TreeNode.scala:385) > at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52) > ... 29 more > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org