Re: Spark Streaming with mapGroupsWithState
I changed it to Tuple2 and that problem is solved. Any thoughts on this message *Unapplied methods are only converted to functions when a function type is expected.* *You can make this conversion explicit by writing `updateAcrossEvents _` or `updateAcrossEvents(_,_,_,_,_)` instead of `updateAcrossEvents`. .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)* On Mon, Mar 2, 2020 at 5:12 PM lec ssmi wrote: > maybe you can combine the fields you want to use into one field > > Something Something 于2020年3月3日周二 上午6:37写道: > >> I am writing a Stateful Streaming application in which I am using >> mapGroupsWithState to create aggregates for Groups but I need to create >> *Groups >> based on more than one column in the Input Row*. All the examples in the >> 'Spark: The Definitive Guide' use only one column such as 'User' or >> 'Device'. I am using code similar to what's given below. *How do I >> specify more than one field in the 'groupByKey'?* >> >> There are other challenges as well. The book says we can use >> 'updateAcrossEvents' the way given below but I get compile time error >> saying: >> >> >> *Error:(43, 65) missing argument list for method updateAcrossEvents in >> object MainUnapplied methods are only converted to functions when a >> function type is expected.You can make this conversion explicit by writing >> `updateAcrossEvents _` or `updateAcrossEvents(_,_,_,_,_)` instead of >> `updateAcrossEvents`. >> .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)* >> >> Another challenge: Compiler also complains about the my *MyReport*: >> *Error:(41, >> 12) Unable to find encoder for type stored in a Dataset. Primitive types >> (Int, String, etc) and Product types (case classes) are supported by >> importing spark.implicits._ Support for serializing other types will be >> added in future releases.* >> >> Help in resolving these errors would be greatly appreciated. Thanks in >> advance. >> >> >> withEventTime >> .as[MyReport] >> .groupByKey(_.getKeys.getKey1). // How do I add _.getKeys.getKey2? >> >> .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents) >> .writeStream >> .queryName("test_query") >> .format("memory") >> .outputMode("update") >> .start() >> >>
Re: Spark Streaming with mapGroupsWithState
maybe you can combine the fields you want to use into one field Something Something 于2020年3月3日周二 上午6:37写道: > I am writing a Stateful Streaming application in which I am using > mapGroupsWithState to create aggregates for Groups but I need to create > *Groups > based on more than one column in the Input Row*. All the examples in the > 'Spark: The Definitive Guide' use only one column such as 'User' or > 'Device'. I am using code similar to what's given below. *How do I > specify more than one field in the 'groupByKey'?* > > There are other challenges as well. The book says we can use > 'updateAcrossEvents' the way given below but I get compile time error > saying: > > > *Error:(43, 65) missing argument list for method updateAcrossEvents in > object MainUnapplied methods are only converted to functions when a > function type is expected.You can make this conversion explicit by writing > `updateAcrossEvents _` or `updateAcrossEvents(_,_,_,_,_)` instead of > `updateAcrossEvents`. > .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)* > > Another challenge: Compiler also complains about the my *MyReport*: > *Error:(41, > 12) Unable to find encoder for type stored in a Dataset. Primitive types > (Int, String, etc) and Product types (case classes) are supported by > importing spark.implicits._ Support for serializing other types will be > added in future releases.* > > Help in resolving these errors would be greatly appreciated. Thanks in > advance. > > > withEventTime > .as[MyReport] > .groupByKey(_.getKeys.getKey1). // How do I add _.getKeys.getKey2? > > .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents) > .writeStream > .queryName("test_query") > .format("memory") > .outputMode("update") > .start() > >
Spark Streaming with mapGroupsWithState
I am writing a Stateful Streaming application in which I am using mapGroupsWithState to create aggregates for Groups but I need to create *Groups based on more than one column in the Input Row*. All the examples in the 'Spark: The Definitive Guide' use only one column such as 'User' or 'Device'. I am using code similar to what's given below. *How do I specify more than one field in the 'groupByKey'?* There are other challenges as well. The book says we can use 'updateAcrossEvents' the way given below but I get compile time error saying: *Error:(43, 65) missing argument list for method updateAcrossEvents in object MainUnapplied methods are only converted to functions when a function type is expected.You can make this conversion explicit by writing `updateAcrossEvents _` or `updateAcrossEvents(_,_,_,_,_)` instead of `updateAcrossEvents`. .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)* Another challenge: Compiler also complains about the my *MyReport*: *Error:(41, 12) Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.* Help in resolving these errors would be greatly appreciated. Thanks in advance. withEventTime .as[MyReport] .groupByKey(_.getKeys.getKey1). // How do I add _.getKeys.getKey2? .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents) .writeStream .queryName("test_query") .format("memory") .outputMode("update") .start()