When i try to use this below code getting this error : *Exception in thread "main" org.apache.spark.sql.AnalysisException: Generators are not supported when it's nested in expressions, but got: generatorouter(explode(generatorouter(explode(json)))); *
StructType schema = DataTypes.createStructType( new StructField[] { DataTypes.createStructField("AckedState", DataTypes.StringType, true), DataTypes.createStructField("ConfirmedState", DataTypes.StringType, true), DataTypes.createStructField("Time", DataTypes.DoubleType, true), DataTypes.createStructField("ActiveState", DataTypes.StringType, true), DataTypes.createStructField("SourceNode", DataTypes.StringType, true), DataTypes.createStructField("SourceName", DataTypes.StringType, true), DataTypes.createStructField("Message", DataTypes.StringType, true), DataTypes.createStructField("Severity", DataTypes.IntegerType, true) }); Dataset<Row> df = sparkSession.readStream().format("kafka").option("kafka.bootstrap.servers", brokers) .option("kafka.ssl.keystore.location", keystore).option("kafka.security.protocol", "SASL_SSL") .option("kafka.ssl.keystore.password", "abc").option("kafka.ssl.truststore.location", truststore) .option("kafka.ssl.truststore.password", "abc").option("kafka.sasl.jaas.config", jaasCfg) .option("kafka.sasl.mechanism", "PLAIN").option("group.id", consumerGroupId) .option("value.serializer", serializer).option("key.serializer", serializer) .option("session.timeout.ms", "30000").option("subscribe", "alerts5").load() .selectExpr("CAST(value AS STRING) as json"); df.printSchema(); Column col = new Column("json"); Dataset<Row> data = df.select(functions.from_json(col, schema).as("json")); data.printSchema(); Dataset<Row> results = data.select(functions.explode_outer(functions.explode_outer(new Column("json")))); results.printSchema(); results.show(); results.writeStream().format("console").option("truncate", false).start().awaitTermination(); i am using function.from_json -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org