When i try to use this below code getting this error :*Exception in thread
"main" org.apache.spark.sql.AnalysisException: Generators are not supported
when it's nested in expressions, but got:
generatorouter(explode(generatorouter(explode(json))));*StructType schema =
DataTypes.createStructType(                             new StructField[] {
DataTypes.createStructField("AckedState", DataTypes.StringType, true),          
                        
DataTypes.createStructField("ConfirmedState", DataTypes.StringType, true),      
                                
DataTypes.createStructField("Time", DataTypes.DoubleType, true),                
                        
DataTypes.createStructField("ActiveState", DataTypes.StringType, true),         
                        
DataTypes.createStructField("SourceNode", DataTypes.StringType, true),          
                        
DataTypes.createStructField("SourceName", DataTypes.StringType, true),          
                        
DataTypes.createStructField("Message", DataTypes.StringType, true),             
                        
DataTypes.createStructField("Severity", DataTypes.IntegerType, true) });        
Dataset df =
sparkSession.readStream().format("kafka").option("kafka.bootstrap.servers",
brokers)                                .option("kafka.ssl.keystore.location",
keystore).option("kafka.security.protocol", "SASL_SSL")                 
.option("kafka.ssl.keystore.password",
"abc").option("kafka.ssl.truststore.location", truststore)                      
.option("kafka.ssl.truststore.password",
"abc").option("kafka.sasl.jaas.config", jaasCfg)                        
.option("kafka.sasl.mechanism", "PLAIN").option("group.id", consumerGroupId)    
                
.option("value.serializer", serializer).option("key.serializer", serializer)    
                
.option("session.timeout.ms", "30000").option("subscribe", "alerts5").load()    
                
.selectExpr("CAST(value AS STRING) as json");           df.printSchema();       
        Column col
= new Column("json");           Dataset data = 
df.select(functions.from_json(col,
schema).as("json"));            data.printSchema();                             
                   Dataset results =
data.select(functions.explode_outer(functions.explode_outer(new
Column("json"))));                                results.printSchema(); 
results.show();                 
results.writeStream().format("console").option("truncate",               
false).start().awaitTermination();i am using function.from_json



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

Reply via email to