kumar created SPARK-24089: ----------------------------- Summary: DataFrame.write.mode(SaveMode.Append).insertInto(TABLE) Key: SPARK-24089 URL: https://issues.apache.org/jira/browse/SPARK-24089 Project: Spark Issue Type: Bug Components: Spark Core, SQL Affects Versions: 2.3.0 Reporter: kumar
I am completely stuck with this issue, unable to progress further. For more info pls refer this post : [https://stackoverflow.com/questions/49994085/spark-sql-2-3-dataframe-savemode-append-issue] I want to load multiple files one by one, don't want to load all files at a time. To achieve this i used SaveMode.Append, so that 2nd file data will be added to 1st file data in database, but it's throwing exception. Code: {code:java} package com.log; import com.log.common.RegexMatch; import com.log.spark.SparkProcessor; import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.*; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import org.apache.spark.storage.StorageLevel; import java.util.ArrayList; import java.util.List; public class TestApp { private SparkSession spark; private SparkContext sparkContext; private SQLContext sqlContext; public TestApp() { SparkSession spark = SparkSession.builder().appName("Simple Application") .config("spark.master", "local").getOrCreate(); SparkContext sc = spark.sparkContext(); this.spark = spark; this.sparkContext = sc; } public static void main(String[] args) { TestApp app = new TestApp(); String[] afiles = {"C:\\Users\\test\\Desktop\\logs\\log1.txt", "C:\\Users\\test\\Desktop\\logs\\log2.txt"}; for (String file : afiles) { app.writeFileToSchema(file); } } public void writeFileToSchema(String filePath) { StructType schema = getSchema(); JavaRDD<Row> rowRDD = getRowRDD(filePath); if (spark.catalog().tableExists("mylogs")) { logDataFrame = spark.createDataFrame(rowRDD, schema); logDataFrame.createOrReplaceTempView("temptable"); logDataFrame.write().mode(SaveMode.Append).insertInto("mylogs");//exception } else { logDataFrame = spark.createDataFrame(rowRDD, schema); logDataFrame.createOrReplaceTempView("mylogs"); } Dataset<Row> results = spark.sql("SELECT count(b1) FROM mylogs"); List<Row> allrows = results.collectAsList(); System.out.println("Count:"+allrows); sqlContext = logDataFrame.sqlContext(); } Dataset<Row> logDataFrame; public List<Row> getTagList() { Dataset<Row> results = sqlContext.sql("SELECT distinct(b1) FROM mylogs"); List<Row> allrows = results.collectAsList(); return allrows; } public StructType getSchema() { String schemaString = "a1 b1 c1 d1"; List<StructField> fields = new ArrayList<>(); for (String fieldName : schemaString.split(" ")) { StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true); fields.add(field); } StructType schema = DataTypes.createStructType(fields); return schema; } public JavaRDD<Row> getRowRDD(String filePath) { JavaRDD<String> logRDD = sparkContext.textFile(filePath, 1).toJavaRDD(); RegexMatch reg = new RegexMatch(); JavaRDD<Row> rowRDD = logRDD .map((Function<String, Row>) line -> { String[] st = line.split(" "); return RowFactory.create(st[0], st[1], st[2], st[3]); }); rowRDD.persist(StorageLevel.MEMORY_ONLY()); return rowRDD; } } {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org