How to map DataSet row to Struct in java?

2020-07-27 Thread anuragDada
When i try to use this below code getting this error :*Exception in thread
"main" org.apache.spark.sql.AnalysisException: Generators are not supported
when it's nested in expressions, but got:
generatorouter(explode(generatorouter(explode(json;*StructType schema =
DataTypes.createStructType( new StructField[] {
DataTypes.createStructField("AckedState", DataTypes.StringType, true),  

DataTypes.createStructField("ConfirmedState", DataTypes.StringType, true),  

DataTypes.createStructField("Time", DataTypes.DoubleType, true),

DataTypes.createStructField("ActiveState", DataTypes.StringType, true), 

DataTypes.createStructField("SourceNode", DataTypes.StringType, true),  

DataTypes.createStructField("SourceName", DataTypes.StringType, true),  

DataTypes.createStructField("Message", DataTypes.StringType, true), 

DataTypes.createStructField("Severity", DataTypes.IntegerType, true) });
Dataset df =
sparkSession.readStream().format("kafka").option("kafka.bootstrap.servers",
brokers).option("kafka.ssl.keystore.location",
keystore).option("kafka.security.protocol", "SASL_SSL") 
.option("kafka.ssl.keystore.password",
"abc").option("kafka.ssl.truststore.location", truststore)  
.option("kafka.ssl.truststore.password",
"abc").option("kafka.sasl.jaas.config", jaasCfg)
.option("kafka.sasl.mechanism", "PLAIN").option("group.id", consumerGroupId)

.option("value.serializer", serializer).option("key.serializer", serializer)

.option("session.timeout.ms", "3").option("subscribe", "alerts5").load()

.selectExpr("CAST(value AS STRING) as json");   df.printSchema();   
Column col
= new Column("json");   Dataset data = 
df.select(functions.from_json(col,
schema).as("json"));data.printSchema(); 
   Dataset results =
data.select(functions.explode_outer(functions.explode_outer(new
Column("json";results.printSchema(); 
results.show(); 
results.writeStream().format("console").option("truncate",   
false).start().awaitTermination();i am using function.from_json



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

How to map DataSet row to Struct in java?

2020-07-27 Thread anuragDada
When i try to use this below code getting this error :
*Exception in thread "main" org.apache.spark.sql.AnalysisException:
Generators are not supported when it's nested in expressions, but got:
generatorouter(explode(generatorouter(explode(json;
*


StructType schema = DataTypes.createStructType(
new StructField[] { 
DataTypes.createStructField("AckedState",
DataTypes.StringType, true),

DataTypes.createStructField("ConfirmedState", DataTypes.StringType,
true),

DataTypes.createStructField("Time", DataTypes.DoubleType, true),

DataTypes.createStructField("ActiveState", DataTypes.StringType,
true),

DataTypes.createStructField("SourceNode", DataTypes.StringType, true),

DataTypes.createStructField("SourceName", DataTypes.StringType, true),

DataTypes.createStructField("Message", DataTypes.StringType, true),

DataTypes.createStructField("Severity", DataTypes.IntegerType, true)
});


Dataset df =
sparkSession.readStream().format("kafka").option("kafka.bootstrap.servers",
brokers)
.option("kafka.ssl.keystore.location",
keystore).option("kafka.security.protocol", "SASL_SSL")
.option("kafka.ssl.keystore.password",
"abc").option("kafka.ssl.truststore.location", truststore)
.option("kafka.ssl.truststore.password",
"abc").option("kafka.sasl.jaas.config", jaasCfg)
.option("kafka.sasl.mechanism", 
"PLAIN").option("group.id",
consumerGroupId)
.option("value.serializer", 
serializer).option("key.serializer",
serializer)
.option("session.timeout.ms", 
"3").option("subscribe",
"alerts5").load()
.selectExpr("CAST(value AS STRING) as json");


df.printSchema();


Column col = new Column("json");


Dataset data = df.select(functions.from_json(col,
schema).as("json"));
data.printSchema();



   Dataset results =
data.select(functions.explode_outer(functions.explode_outer(new
Column("json";

  results.printSchema(); results.show();
  results.writeStream().format("console").option("truncate",
  false).start().awaitTermination();

i am using function.from_json



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org