[jira] [Commented] (SPARK-23337) withWatermark raises an exception on struct objects

2018-08-14 Thread Aydin Kocas (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-23337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16580440#comment-16580440
 ] 

Aydin Kocas commented on SPARK-23337:
-

[~marmbrus] Can you give a hint how to do it with "withColumn" in java?

 
Dataset jsonRow = spark.readStream()
.schema(...)
.json(..).withColumn("createTime", ?? );.withWatermark("createTime", "10 
minutes"); 

> withWatermark raises an exception on struct objects
> ---
>
> Key: SPARK-23337
> URL: https://issues.apache.org/jira/browse/SPARK-23337
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.1
> Environment: Linux Ubuntu, Spark on standalone mode
>Reporter: Aydin Kocas
>Priority: Major
>
> Hi,
>  
> when using a nested object (I mean an object within a struct, here concrete: 
> _source.createTime) from a json file as the parameter for the 
> withWatermark-method, I get an exception (see below).
> Anything else works flawlessly with the nested object.
>  
> +*{color:#14892c}works:{color}*+ 
> {code:java}
> Dataset jsonRow = 
> spark.readStream().schema(getJSONSchema()).json(file).dropDuplicates("_id").withWatermark("myTime",
>  "10 seconds").toDF();{code}
>  
> json structure:
> {code:java}
> root
>  |-- _id: string (nullable = true)
>  |-- _index: string (nullable = true)
>  |-- _score: long (nullable = true)
>  |-- myTime: timestamp (nullable = true)
> ..{code}
> +*{color:#d04437}does not work - nested json{color}:*+
> {code:java}
> Dataset jsonRow = 
> spark.readStream().schema(getJSONSchema()).json(file).dropDuplicates("_id").withWatermark("_source.createTime",
>  "10 seconds").toDF();{code}
>  
> json structure:
>  
> {code:java}
> root
>  |-- _id: string (nullable = true)
>  |-- _index: string (nullable = true)
>  |-- _score: long (nullable = true)
>  |-- _source: struct (nullable = true)
>  | |-- createTime: timestamp (nullable = true)
> ..
>  
> Exception in thread "main" 
> org.apache.spark.sql.catalyst.errors.package$TreeNodeException: makeCopy, 
> tree:
> 'EventTimeWatermark '_source.createTime, interval 10 seconds
> +- Deduplicate [_id#0], true
>  +- StreamingRelation 
> DataSource(org.apache.spark.sql.SparkSession@5dbbb292,json,List(),Some(StructType(StructField(_id,StringType,true),
>  StructField(_index,StringType,true), StructField(_score,LongType,true), 
> StructField(_source,StructType(StructField(additionalData,StringType,true), 
> StructField(client,StringType,true), 
> StructField(clientDomain,BooleanType,true), 
> StructField(clientVersion,StringType,true), 
> StructField(country,StringType,true), 
> StructField(countryName,StringType,true), 
> StructField(createTime,TimestampType,true), 
> StructField(externalIP,StringType,true), 
> StructField(hostname,StringType,true), 
> StructField(internalIP,StringType,true), 
> StructField(location,StringType,true), 
> StructField(locationDestination,StringType,true), 
> StructField(login,StringType,true), 
> StructField(originalRequestString,StringType,true), 
> StructField(password,StringType,true), 
> StructField(peerIdent,StringType,true), 
> StructField(peerType,StringType,true), 
> StructField(recievedTime,TimestampType,true), 
> StructField(sessionEnd,StringType,true), 
> StructField(sessionStart,StringType,true), 
> StructField(sourceEntryAS,StringType,true), 
> StructField(sourceEntryIp,StringType,true), 
> StructField(sourceEntryPort,StringType,true), 
> StructField(targetCountry,StringType,true), 
> StructField(targetCountryName,StringType,true), 
> StructField(targetEntryAS,StringType,true), 
> StructField(targetEntryIp,StringType,true), 
> StructField(targetEntryPort,StringType,true), 
> StructField(targetport,StringType,true), 
> StructField(username,StringType,true), 
> StructField(vulnid,StringType,true)),true), 
> StructField(_type,StringType,true))),List(),None,Map(path -> ./input/),None), 
> FileSource[./input/], [_id#0, _index#1, _score#2L, _source#3, _type#4]
> at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
>  at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:385)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:300)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:268)
>  at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:854)
>  at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:796)
>  at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:62)
>  at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:62)
>  at 
> 

[jira] [Commented] (SPARK-20894) Error while checkpointing to HDFS

2018-04-25 Thread Aydin Kocas (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16452041#comment-16452041
 ] 

Aydin Kocas commented on SPARK-20894:
-

removing the checkpoint location along with the _spark_metadata folder in the 
affected writeStream output folder helped to get rid of the issue, but 

it should be notices that the situation persists in spark 2.3.

Seems that there is some bad state in _spark_metadata - it happened unexpected 
without any code change - therefore  me it looks like a bug somewhere. I am not 
having any hdfs, am developing locally without a cluster,

> Error while checkpointing to HDFS
> -
>
> Key: SPARK-20894
> URL: https://issues.apache.org/jira/browse/SPARK-20894
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.1.1
> Environment: Ubuntu, Spark 2.1.1, hadoop 2.7
>Reporter: kant kodali
>Assignee: Shixiong Zhu
>Priority: Major
> Fix For: 2.3.0
>
> Attachments: driver_info_log, executor1_log, executor2_log
>
>
> Dataset df2 = df1.groupBy(functions.window(df1.col("Timestamp5"), "24 
> hours", "24 hours"), df1.col("AppName")).count();
> StreamingQuery query = df2.writeStream().foreach(new 
> KafkaSink()).option("checkpointLocation","/usr/local/hadoop/checkpoint").outputMode("update").start();
> query.awaitTermination();
> This for some reason fails with the Error 
> ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
> java.lang.IllegalStateException: Error reading delta file 
> /usr/local/hadoop/checkpoint/state/0/0/1.delta of HDFSStateStoreProvider[id = 
> (op=0, part=0), dir = /usr/local/hadoop/checkpoint/state/0/0]: 
> /usr/local/hadoop/checkpoint/state/0/0/1.delta does not exist
> I did clear all the checkpoint data in /usr/local/hadoop/checkpoint/  and all 
> consumer offsets in Kafka from all brokers prior to running and yet this 
> error still persists. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20894) Error while checkpointing to HDFS

2018-04-25 Thread Aydin Kocas (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16452030#comment-16452030
 ] 

Aydin Kocas commented on SPARK-20894:
-

having the same issue on 2.3 - what's the solution? Am developing locally on 
standard ext4-fs, not in a hdfs-cluster but having same hdfs-related entries in 
the log along with the error message

> Error while checkpointing to HDFS
> -
>
> Key: SPARK-20894
> URL: https://issues.apache.org/jira/browse/SPARK-20894
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.1.1
> Environment: Ubuntu, Spark 2.1.1, hadoop 2.7
>Reporter: kant kodali
>Assignee: Shixiong Zhu
>Priority: Major
> Fix For: 2.3.0
>
> Attachments: driver_info_log, executor1_log, executor2_log
>
>
> Dataset df2 = df1.groupBy(functions.window(df1.col("Timestamp5"), "24 
> hours", "24 hours"), df1.col("AppName")).count();
> StreamingQuery query = df2.writeStream().foreach(new 
> KafkaSink()).option("checkpointLocation","/usr/local/hadoop/checkpoint").outputMode("update").start();
> query.awaitTermination();
> This for some reason fails with the Error 
> ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
> java.lang.IllegalStateException: Error reading delta file 
> /usr/local/hadoop/checkpoint/state/0/0/1.delta of HDFSStateStoreProvider[id = 
> (op=0, part=0), dir = /usr/local/hadoop/checkpoint/state/0/0]: 
> /usr/local/hadoop/checkpoint/state/0/0/1.delta does not exist
> I did clear all the checkpoint data in /usr/local/hadoop/checkpoint/  and all 
> consumer offsets in Kafka from all brokers prior to running and yet this 
> error still persists. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23337) withWatermark raises an exception on struct objects

2018-04-10 Thread Aydin Kocas (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16432200#comment-16432200
 ] 

Aydin Kocas commented on SPARK-23337:
-

Hi Michael, in my case it's a blocking issue and unfortunately not just 
annoying because I can't use the watermarking-functionality when doing the 
readstream on a json file . Without the time limitation via the 
watermarking-functionality, I have concerns that my checkpoint-dir will 
increase with time because of not having any time boundaries.

> withWatermark raises an exception on struct objects
> ---
>
> Key: SPARK-23337
> URL: https://issues.apache.org/jira/browse/SPARK-23337
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.2.1
> Environment: Linux Ubuntu, Spark on standalone mode
>Reporter: Aydin Kocas
>Priority: Major
>
> Hi,
>  
> when using a nested object (I mean an object within a struct, here concrete: 
> _source.createTime) from a json file as the parameter for the 
> withWatermark-method, I get an exception (see below).
> Anything else works flawlessly with the nested object.
>  
> +*{color:#14892c}works:{color}*+ 
> {code:java}
> Dataset jsonRow = 
> spark.readStream().schema(getJSONSchema()).json(file).dropDuplicates("_id").withWatermark("myTime",
>  "10 seconds").toDF();{code}
>  
> json structure:
> {code:java}
> root
>  |-- _id: string (nullable = true)
>  |-- _index: string (nullable = true)
>  |-- _score: long (nullable = true)
>  |-- myTime: timestamp (nullable = true)
> ..{code}
> +*{color:#d04437}does not work - nested json{color}:*+
> {code:java}
> Dataset jsonRow = 
> spark.readStream().schema(getJSONSchema()).json(file).dropDuplicates("_id").withWatermark("_source.createTime",
>  "10 seconds").toDF();{code}
>  
> json structure:
>  
> {code:java}
> root
>  |-- _id: string (nullable = true)
>  |-- _index: string (nullable = true)
>  |-- _score: long (nullable = true)
>  |-- _source: struct (nullable = true)
>  | |-- createTime: timestamp (nullable = true)
> ..
>  
> Exception in thread "main" 
> org.apache.spark.sql.catalyst.errors.package$TreeNodeException: makeCopy, 
> tree:
> 'EventTimeWatermark '_source.createTime, interval 10 seconds
> +- Deduplicate [_id#0], true
>  +- StreamingRelation 
> DataSource(org.apache.spark.sql.SparkSession@5dbbb292,json,List(),Some(StructType(StructField(_id,StringType,true),
>  StructField(_index,StringType,true), StructField(_score,LongType,true), 
> StructField(_source,StructType(StructField(additionalData,StringType,true), 
> StructField(client,StringType,true), 
> StructField(clientDomain,BooleanType,true), 
> StructField(clientVersion,StringType,true), 
> StructField(country,StringType,true), 
> StructField(countryName,StringType,true), 
> StructField(createTime,TimestampType,true), 
> StructField(externalIP,StringType,true), 
> StructField(hostname,StringType,true), 
> StructField(internalIP,StringType,true), 
> StructField(location,StringType,true), 
> StructField(locationDestination,StringType,true), 
> StructField(login,StringType,true), 
> StructField(originalRequestString,StringType,true), 
> StructField(password,StringType,true), 
> StructField(peerIdent,StringType,true), 
> StructField(peerType,StringType,true), 
> StructField(recievedTime,TimestampType,true), 
> StructField(sessionEnd,StringType,true), 
> StructField(sessionStart,StringType,true), 
> StructField(sourceEntryAS,StringType,true), 
> StructField(sourceEntryIp,StringType,true), 
> StructField(sourceEntryPort,StringType,true), 
> StructField(targetCountry,StringType,true), 
> StructField(targetCountryName,StringType,true), 
> StructField(targetEntryAS,StringType,true), 
> StructField(targetEntryIp,StringType,true), 
> StructField(targetEntryPort,StringType,true), 
> StructField(targetport,StringType,true), 
> StructField(username,StringType,true), 
> StructField(vulnid,StringType,true)),true), 
> StructField(_type,StringType,true))),List(),None,Map(path -> ./input/),None), 
> FileSource[./input/], [_id#0, _index#1, _score#2L, _source#3, _type#4]
> at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
>  at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:385)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:300)
>  at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:268)
>  at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:854)
>  at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:796)
>  at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:62)
>  

[jira] [Updated] (SPARK-23337) withWatermark raises an exception on struct objects

2018-02-06 Thread Aydin Kocas (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23337?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aydin Kocas updated SPARK-23337:

Description: 
Hi,

 

when using a nested object (I mean an object within a struct, here concrete: 
_source.createTime) from a json file as the parameter for the 
withWatermark-method, I get an exception (see below).

Anything else works flawlessly with the nested object.

 

+*{color:#14892c}works:{color}*+ 
{code:java}
Dataset jsonRow = 
spark.readStream().schema(getJSONSchema()).json(file).dropDuplicates("_id").withWatermark("myTime",
 "10 seconds").toDF();{code}
 

json structure:
{code:java}
root
 |-- _id: string (nullable = true)
 |-- _index: string (nullable = true)
 |-- _score: long (nullable = true)
 |-- myTime: timestamp (nullable = true)
..{code}
+*{color:#d04437}does not work - nested json{color}:*+
{code:java}
Dataset jsonRow = 
spark.readStream().schema(getJSONSchema()).json(file).dropDuplicates("_id").withWatermark("_source.createTime",
 "10 seconds").toDF();{code}
 

json structure:

 
{code:java}
root
 |-- _id: string (nullable = true)
 |-- _index: string (nullable = true)
 |-- _score: long (nullable = true)
 |-- _source: struct (nullable = true)
 | |-- createTime: timestamp (nullable = true)
..
 
Exception in thread "main" 
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: makeCopy, tree:
'EventTimeWatermark '_source.createTime, interval 10 seconds
+- Deduplicate [_id#0], true
 +- StreamingRelation 
DataSource(org.apache.spark.sql.SparkSession@5dbbb292,json,List(),Some(StructType(StructField(_id,StringType,true),
 StructField(_index,StringType,true), StructField(_score,LongType,true), 
StructField(_source,StructType(StructField(additionalData,StringType,true), 
StructField(client,StringType,true), 
StructField(clientDomain,BooleanType,true), 
StructField(clientVersion,StringType,true), 
StructField(country,StringType,true), StructField(countryName,StringType,true), 
StructField(createTime,TimestampType,true), 
StructField(externalIP,StringType,true), StructField(hostname,StringType,true), 
StructField(internalIP,StringType,true), StructField(location,StringType,true), 
StructField(locationDestination,StringType,true), 
StructField(login,StringType,true), 
StructField(originalRequestString,StringType,true), 
StructField(password,StringType,true), StructField(peerIdent,StringType,true), 
StructField(peerType,StringType,true), 
StructField(recievedTime,TimestampType,true), 
StructField(sessionEnd,StringType,true), 
StructField(sessionStart,StringType,true), 
StructField(sourceEntryAS,StringType,true), 
StructField(sourceEntryIp,StringType,true), 
StructField(sourceEntryPort,StringType,true), 
StructField(targetCountry,StringType,true), 
StructField(targetCountryName,StringType,true), 
StructField(targetEntryAS,StringType,true), 
StructField(targetEntryIp,StringType,true), 
StructField(targetEntryPort,StringType,true), 
StructField(targetport,StringType,true), StructField(username,StringType,true), 
StructField(vulnid,StringType,true)),true), 
StructField(_type,StringType,true))),List(),None,Map(path -> ./input/),None), 
FileSource[./input/], [_id#0, _index#1, _score#2L, _source#3, _type#4]
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
 at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:385)
 at 
org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:300)
 at 
org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:268)
 at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:854)
 at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:796)
 at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:62)
 at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:62)
 at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
 at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:61)
 at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:796)
 at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:674)
 at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85)
 at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82)
 at 
scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
 at scala.collection.immutable.List.foldLeft(List.scala:84)
 at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82)
 at 

[jira] [Updated] (SPARK-23337) withWatermark raises an exception on nested objects

2018-02-05 Thread Aydin Kocas (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-23337?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aydin Kocas updated SPARK-23337:

Description: 
Hi,

 

when using a nested object (here: _source.creationTime) from a json file as the 
parameter for the withWatermark-method, I get an exception (see below).

Anything else works flawlessly with the nested object.

 

+*{color:#14892c}works:{color}*+ 
{code:java}
Dataset jsonRow = 
spark.readStream().schema(getJSONSchema()).json(file).dropDuplicates("_id").withWatermark("myTime",
 "10 seconds").toDF();{code}
 

json structure:
{code:java}
root
 |-- _id: string (nullable = true)
 |-- _index: string (nullable = true)
 |-- _score: long (nullable = true)
 |-- myTime: timestamp (nullable = true)
..{code}
+*{color:#d04437}does not work - nested json{color}:*+
{code:java}
Dataset jsonRow = 
spark.readStream().schema(getJSONSchema()).json(file).dropDuplicates("_id").withWatermark("_source.createTime",
 "10 seconds").toDF();{code}
 

json structure:

 
{code:java}
root
 |-- _id: string (nullable = true)
 |-- _index: string (nullable = true)
 |-- _score: long (nullable = true)
 |-- _source: struct (nullable = true)
 | |-- createTime: timestamp (nullable = true)
..
 
Exception in thread "main" 
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: makeCopy, tree:
'EventTimeWatermark '_source.createTime, interval 10 seconds
+- Deduplicate [_id#0], true
 +- StreamingRelation 
DataSource(org.apache.spark.sql.SparkSession@5dbbb292,json,List(),Some(StructType(StructField(_id,StringType,true),
 StructField(_index,StringType,true), StructField(_score,LongType,true), 
StructField(_source,StructType(StructField(additionalData,StringType,true), 
StructField(client,StringType,true), 
StructField(clientDomain,BooleanType,true), 
StructField(clientVersion,StringType,true), 
StructField(country,StringType,true), StructField(countryName,StringType,true), 
StructField(createTime,TimestampType,true), 
StructField(externalIP,StringType,true), StructField(hostname,StringType,true), 
StructField(internalIP,StringType,true), StructField(location,StringType,true), 
StructField(locationDestination,StringType,true), 
StructField(login,StringType,true), 
StructField(originalRequestString,StringType,true), 
StructField(password,StringType,true), StructField(peerIdent,StringType,true), 
StructField(peerType,StringType,true), 
StructField(recievedTime,TimestampType,true), 
StructField(sessionEnd,StringType,true), 
StructField(sessionStart,StringType,true), 
StructField(sourceEntryAS,StringType,true), 
StructField(sourceEntryIp,StringType,true), 
StructField(sourceEntryPort,StringType,true), 
StructField(targetCountry,StringType,true), 
StructField(targetCountryName,StringType,true), 
StructField(targetEntryAS,StringType,true), 
StructField(targetEntryIp,StringType,true), 
StructField(targetEntryPort,StringType,true), 
StructField(targetport,StringType,true), StructField(username,StringType,true), 
StructField(vulnid,StringType,true)),true), 
StructField(_type,StringType,true))),List(),None,Map(path -> ./input/),None), 
FileSource[./input/], [_id#0, _index#1, _score#2L, _source#3, _type#4]
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
 at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:385)
 at 
org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:300)
 at 
org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:268)
 at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:854)
 at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:796)
 at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:62)
 at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:62)
 at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
 at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:61)
 at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:796)
 at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:674)
 at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85)
 at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82)
 at 
scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
 at scala.collection.immutable.List.foldLeft(List.scala:84)
 at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82)
 at 

[jira] [Created] (SPARK-23337) withWatermark raises an exception on nested objects

2018-02-05 Thread Aydin Kocas (JIRA)
Aydin Kocas created SPARK-23337:
---

 Summary: withWatermark raises an exception on nested objects
 Key: SPARK-23337
 URL: https://issues.apache.org/jira/browse/SPARK-23337
 Project: Spark
  Issue Type: Bug
  Components: Structured Streaming
Affects Versions: 2.2.1
 Environment: Linux Ubuntu, Spark on standalone mode
Reporter: Aydin Kocas


Hi,

 

when using a nested object (here: _source.creationTime) from a json file as the 
parameter for the withWatermark-method, I get an exception (see below).

Anything else works flawlessly with the nested object.

 


+*{color:#14892c}works:{color}*+ 
{code:java}
Dataset jsonRow = 
spark.readStream().schema(getJSONSchema()).json(file).dropDuplicates("_id").withWatermark("myTime",
 "10 seconds").toDF();{code}
 

json structure:
{code:java}
root
 |-- _id: string (nullable = true)
 |-- _index: string (nullable = true)
 |-- _score: long (nullable = true)
 |-- myTime: timestamp (nullable = true)
..{code}

+*{color:#d04437}does not work - nested json{color}:*+
{code:java}
Dataset jsonRow = 
spark.readStream().schema(getJSONSchema()).json(file).dropDuplicates("_id").withWatermark("_source.creationTime",
 "10 seconds").toDF();{code}
 

json structure:

 
{code:java}
root
 |-- _id: string (nullable = true)
 |-- _index: string (nullable = true)
 |-- _score: long (nullable = true)
 |-- _source: struct (nullable = true)
 | |-- creationTime: timestamp (nullable = true)
..
 
Exception in thread "main" 
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: makeCopy, tree:
'EventTimeWatermark '_source.createTime, interval 10 seconds
+- Deduplicate [_id#0], true
 +- StreamingRelation 
DataSource(org.apache.spark.sql.SparkSession@5dbbb292,json,List(),Some(StructType(StructField(_id,StringType,true),
 StructField(_index,StringType,true), StructField(_score,LongType,true), 
StructField(_source,StructType(StructField(additionalData,StringType,true), 
StructField(client,StringType,true), 
StructField(clientDomain,BooleanType,true), 
StructField(clientVersion,StringType,true), 
StructField(country,StringType,true), StructField(countryName,StringType,true), 
StructField(createTime,TimestampType,true), 
StructField(externalIP,StringType,true), StructField(hostname,StringType,true), 
StructField(internalIP,StringType,true), StructField(location,StringType,true), 
StructField(locationDestination,StringType,true), 
StructField(login,StringType,true), 
StructField(originalRequestString,StringType,true), 
StructField(password,StringType,true), StructField(peerIdent,StringType,true), 
StructField(peerType,StringType,true), 
StructField(recievedTime,TimestampType,true), 
StructField(sessionEnd,StringType,true), 
StructField(sessionStart,StringType,true), 
StructField(sourceEntryAS,StringType,true), 
StructField(sourceEntryIp,StringType,true), 
StructField(sourceEntryPort,StringType,true), 
StructField(targetCountry,StringType,true), 
StructField(targetCountryName,StringType,true), 
StructField(targetEntryAS,StringType,true), 
StructField(targetEntryIp,StringType,true), 
StructField(targetEntryPort,StringType,true), 
StructField(targetport,StringType,true), StructField(username,StringType,true), 
StructField(vulnid,StringType,true)),true), 
StructField(_type,StringType,true))),List(),None,Map(path -> ./input/),None), 
FileSource[./input/], [_id#0, _index#1, _score#2L, _source#3, _type#4]
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
 at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:385)
 at 
org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:300)
 at 
org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:268)
 at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:854)
 at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:796)
 at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:62)
 at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:62)
 at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
 at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:61)
 at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:796)
 at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:674)
 at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85)
 at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82)
 at