[ https://issues.apache.org/jira/browse/SPARK-24089?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
kumar updated SPARK-24089: -------------------------- Description: I am completely stuck with this issue, unable to progress further. For more info pls refer this post : [https://stackoverflow.com/questions/49994085/spark-sql-2-3-dataframe-savemode-append-issue] I want to load multiple files one by one, don't want to load all files at a time. To achieve this i used SaveMode.Append, so that 2nd file data will be added to 1st file data in database, but it's throwing exception. {code:java} Exception in thread "main" org.apache.spark.sql.AnalysisException: unresolved operator 'InsertIntoTable LogicalRDD [a1#4, b1#5, c1#6, d1#7], false, false, false;; 'InsertIntoTable LogicalRDD [a1#4, b1#5, c1#6, d1#7], false, false, false +- LogicalRDD [a1#22, b1#23, c1#24, d1#25], false {code} Code: {code:java} package com.log; import com.log.common.RegexMatch; import com.log.spark.SparkProcessor; import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.*; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import org.apache.spark.storage.StorageLevel; import java.util.ArrayList; import java.util.List; public class TestApp { private SparkSession spark; private SparkContext sparkContext; private SQLContext sqlContext; public TestApp() { SparkSession spark = SparkSession.builder().appName("Simple Application") .config("spark.master", "local").getOrCreate(); SparkContext sc = spark.sparkContext(); this.spark = spark; this.sparkContext = sc; } public static void main(String[] args) { TestApp app = new TestApp(); String[] afiles = {"C:\\Users\\test\\Desktop\\logs\\log1.txt", "C:\\Users\\test\\Desktop\\logs\\log2.txt"}; for (String file : afiles) { app.writeFileToSchema(file); } } public void writeFileToSchema(String filePath) { StructType schema = getSchema(); JavaRDD<Row> rowRDD = getRowRDD(filePath); if (spark.catalog().tableExists("mylogs")) { logDataFrame = spark.createDataFrame(rowRDD, schema); logDataFrame.createOrReplaceTempView("temptable"); logDataFrame.write().mode(SaveMode.Append).insertInto("mylogs");//exception } else { logDataFrame = spark.createDataFrame(rowRDD, schema); logDataFrame.createOrReplaceTempView("mylogs"); } Dataset<Row> results = spark.sql("SELECT count(b1) FROM mylogs"); List<Row> allrows = results.collectAsList(); System.out.println("Count:"+allrows); sqlContext = logDataFrame.sqlContext(); } Dataset<Row> logDataFrame; public List<Row> getTagList() { Dataset<Row> results = sqlContext.sql("SELECT distinct(b1) FROM mylogs"); List<Row> allrows = results.collectAsList(); return allrows; } public StructType getSchema() { String schemaString = "a1 b1 c1 d1"; List<StructField> fields = new ArrayList<>(); for (String fieldName : schemaString.split(" ")) { StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true); fields.add(field); } StructType schema = DataTypes.createStructType(fields); return schema; } public JavaRDD<Row> getRowRDD(String filePath) { JavaRDD<String> logRDD = sparkContext.textFile(filePath, 1).toJavaRDD(); RegexMatch reg = new RegexMatch(); JavaRDD<Row> rowRDD = logRDD .map((Function<String, Row>) line -> { String[] st = line.split(" "); return RowFactory.create(st[0], st[1], st[2], st[3]); }); rowRDD.persist(StorageLevel.MEMORY_ONLY()); return rowRDD; } } {code} was: I am completely stuck with this issue, unable to progress further. For more info pls refer this post : [https://stackoverflow.com/questions/49994085/spark-sql-2-3-dataframe-savemode-append-issue] I want to load multiple files one by one, don't want to load all files at a time. To achieve this i used SaveMode.Append, so that 2nd file data will be added to 1st file data in database, but it's throwing exception. Code: {code:java} package com.log; import com.log.common.RegexMatch; import com.log.spark.SparkProcessor; import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.*; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import org.apache.spark.storage.StorageLevel; import java.util.ArrayList; import java.util.List; public class TestApp { private SparkSession spark; private SparkContext sparkContext; private SQLContext sqlContext; public TestApp() { SparkSession spark = SparkSession.builder().appName("Simple Application") .config("spark.master", "local").getOrCreate(); SparkContext sc = spark.sparkContext(); this.spark = spark; this.sparkContext = sc; } public static void main(String[] args) { TestApp app = new TestApp(); String[] afiles = {"C:\\Users\\test\\Desktop\\logs\\log1.txt", "C:\\Users\\test\\Desktop\\logs\\log2.txt"}; for (String file : afiles) { app.writeFileToSchema(file); } } public void writeFileToSchema(String filePath) { StructType schema = getSchema(); JavaRDD<Row> rowRDD = getRowRDD(filePath); if (spark.catalog().tableExists("mylogs")) { logDataFrame = spark.createDataFrame(rowRDD, schema); logDataFrame.createOrReplaceTempView("temptable"); logDataFrame.write().mode(SaveMode.Append).insertInto("mylogs");//exception } else { logDataFrame = spark.createDataFrame(rowRDD, schema); logDataFrame.createOrReplaceTempView("mylogs"); } Dataset<Row> results = spark.sql("SELECT count(b1) FROM mylogs"); List<Row> allrows = results.collectAsList(); System.out.println("Count:"+allrows); sqlContext = logDataFrame.sqlContext(); } Dataset<Row> logDataFrame; public List<Row> getTagList() { Dataset<Row> results = sqlContext.sql("SELECT distinct(b1) FROM mylogs"); List<Row> allrows = results.collectAsList(); return allrows; } public StructType getSchema() { String schemaString = "a1 b1 c1 d1"; List<StructField> fields = new ArrayList<>(); for (String fieldName : schemaString.split(" ")) { StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true); fields.add(field); } StructType schema = DataTypes.createStructType(fields); return schema; } public JavaRDD<Row> getRowRDD(String filePath) { JavaRDD<String> logRDD = sparkContext.textFile(filePath, 1).toJavaRDD(); RegexMatch reg = new RegexMatch(); JavaRDD<Row> rowRDD = logRDD .map((Function<String, Row>) line -> { String[] st = line.split(" "); return RowFactory.create(st[0], st[1], st[2], st[3]); }); rowRDD.persist(StorageLevel.MEMORY_ONLY()); return rowRDD; } } {code} > DataFrame.write().mode(SaveMode.Append).insertInto(TABLE) > ---------------------------------------------------------- > > Key: SPARK-24089 > URL: https://issues.apache.org/jira/browse/SPARK-24089 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL > Affects Versions: 2.3.0 > Reporter: kumar > Priority: Blocker > Labels: bug > > I am completely stuck with this issue, unable to progress further. For more > info pls refer this post : > [https://stackoverflow.com/questions/49994085/spark-sql-2-3-dataframe-savemode-append-issue] > I want to load multiple files one by one, don't want to load all files at a > time. To achieve this i used SaveMode.Append, so that 2nd file data will be > added to 1st file data in database, but it's throwing exception. > {code:java} > Exception in thread "main" org.apache.spark.sql.AnalysisException: unresolved > operator 'InsertIntoTable LogicalRDD [a1#4, b1#5, c1#6, d1#7], false, false, > false;; > 'InsertIntoTable LogicalRDD [a1#4, b1#5, c1#6, d1#7], false, false, false > +- LogicalRDD [a1#22, b1#23, c1#24, d1#25], false > {code} > Code: > {code:java} > package com.log; > import com.log.common.RegexMatch; > import com.log.spark.SparkProcessor; > import org.apache.spark.SparkContext; > import org.apache.spark.api.java.JavaRDD; > import org.apache.spark.api.java.function.Function; > import org.apache.spark.sql.*; > import org.apache.spark.sql.types.DataTypes; > import org.apache.spark.sql.types.StructField; > import org.apache.spark.sql.types.StructType; > import org.apache.spark.storage.StorageLevel; > import java.util.ArrayList; > import java.util.List; > public class TestApp { > private SparkSession spark; > private SparkContext sparkContext; > private SQLContext sqlContext; > public TestApp() { > SparkSession spark = SparkSession.builder().appName("Simple > Application") > .config("spark.master", "local").getOrCreate(); > SparkContext sc = spark.sparkContext(); > this.spark = spark; > this.sparkContext = sc; > } > public static void main(String[] args) { > TestApp app = new TestApp(); > String[] afiles = {"C:\\Users\\test\\Desktop\\logs\\log1.txt", > "C:\\Users\\test\\Desktop\\logs\\log2.txt"}; > for (String file : afiles) { > app.writeFileToSchema(file); > } > } > public void writeFileToSchema(String filePath) { > StructType schema = getSchema(); > JavaRDD<Row> rowRDD = getRowRDD(filePath); > if (spark.catalog().tableExists("mylogs")) { > logDataFrame = spark.createDataFrame(rowRDD, schema); > logDataFrame.createOrReplaceTempView("temptable"); > > logDataFrame.write().mode(SaveMode.Append).insertInto("mylogs");//exception > } else { > logDataFrame = spark.createDataFrame(rowRDD, schema); > logDataFrame.createOrReplaceTempView("mylogs"); > } > Dataset<Row> results = spark.sql("SELECT count(b1) FROM mylogs"); > List<Row> allrows = results.collectAsList(); > System.out.println("Count:"+allrows); > sqlContext = logDataFrame.sqlContext(); > } > Dataset<Row> logDataFrame; > public List<Row> getTagList() { > Dataset<Row> results = sqlContext.sql("SELECT distinct(b1) FROM > mylogs"); > List<Row> allrows = results.collectAsList(); > return allrows; > } > public StructType getSchema() { > String schemaString = "a1 b1 c1 d1"; > List<StructField> fields = new ArrayList<>(); > for (String fieldName : schemaString.split(" ")) { > StructField field = DataTypes.createStructField(fieldName, > DataTypes.StringType, true); > fields.add(field); > } > StructType schema = DataTypes.createStructType(fields); > return schema; > } > public JavaRDD<Row> getRowRDD(String filePath) { > JavaRDD<String> logRDD = sparkContext.textFile(filePath, > 1).toJavaRDD(); > RegexMatch reg = new RegexMatch(); > JavaRDD<Row> rowRDD = logRDD > .map((Function<String, Row>) line -> { > String[] st = line.split(" "); > return RowFactory.create(st[0], st[1], st[2], st[3]); > }); > rowRDD.persist(StorageLevel.MEMORY_ONLY()); > return rowRDD; > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org