When i try to use this below code getting this error :
*Exception in thread "main" org.apache.spark.sql.AnalysisException:
Generators are not supported when it's nested in expressions, but got:
generatorouter(explode(generatorouter(explode(json))));
*


StructType schema = DataTypes.createStructType(
                                new StructField[] { 
DataTypes.createStructField("AckedState",
DataTypes.StringType, true),
                                                
DataTypes.createStructField("ConfirmedState", DataTypes.StringType,
true),
                                                
DataTypes.createStructField("Time", DataTypes.DoubleType, true),
                                                
DataTypes.createStructField("ActiveState", DataTypes.StringType,
true),
                                                
DataTypes.createStructField("SourceNode", DataTypes.StringType, true),
                                                
DataTypes.createStructField("SourceName", DataTypes.StringType, true),
                                                
DataTypes.createStructField("Message", DataTypes.StringType, true),
                                                
DataTypes.createStructField("Severity", DataTypes.IntegerType, true)
});


                Dataset<Row> df =
sparkSession.readStream().format("kafka").option("kafka.bootstrap.servers",
brokers)
                                .option("kafka.ssl.keystore.location",
keystore).option("kafka.security.protocol", "SASL_SSL")
                                .option("kafka.ssl.keystore.password",
"abc").option("kafka.ssl.truststore.location", truststore)
                                .option("kafka.ssl.truststore.password",
"abc").option("kafka.sasl.jaas.config", jaasCfg)
                                .option("kafka.sasl.mechanism", 
"PLAIN").option("group.id",
consumerGroupId)
                                .option("value.serializer", 
serializer).option("key.serializer",
serializer)
                                .option("session.timeout.ms", 
"30000").option("subscribe",
"alerts5").load()
                                .selectExpr("CAST(value AS STRING) as json");


                df.printSchema();


                Column col = new Column("json");


                Dataset<Row> data = df.select(functions.from_json(col,
schema).as("json"));
                data.printSchema();
        
                
                
           Dataset<Row> results =
data.select(functions.explode_outer(functions.explode_outer(new
Column("json"))));
                
                  results.printSchema(); results.show();
                  results.writeStream().format("console").option("truncate",
                  false).start().awaitTermination();

i am using function.from_json



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to