Hi Community, I am trying to use Carbondata to store data that ingest continuously and retain for a long period of time, e.g. 1 year. It will need to load multiple csv concurrently to handle the throughput of the data. Along the way, I come across a couple of issues. I am testing using the latest Carbondata release now, 1.6.1.
The issues: 1. The table become inconsistent when concurrent load is running [ https://issues.apache.org/jira/browse/CARBONDATA-3573]. I temporary fix that and proceed and bump to issue #2. 2. The table compaction failed when concurrent load is running together with table compaction. The compaction work fine for a while and then it failed, then compaction never work for the table anymore after that. 3. Even though I try to run the data load in multiple thread, i.e. 4 threads, but I observed maximum only 2 concurrent load is running at any time (from the SPARK UI Executors page). I am running Carbondata with master = local[16]. 4. Loading data into Carbondata will become slower when the tablestatus growing bigger. I believe it will affect the query as well. [posted in another thread - http://apache-carbondata-mailing-list-archive.1130556.n5.nabble.com/Data-Load-performance-degrade-when-number-of-segment-increase-tp86031.html ] Anyone has experience on concurrent data loading with Carbondata and did anyone face the same issues? Any pointer on how to fix issue #2? Any plan or idea on how to improve issue #3 & #4? Regards, Chin Wei The stacktrace for issue #2: ERROR CarbonTableCompactor:88 - Exception in compaction thread null java.lang.NullPointerException at org.apache.carbondata.core.datamap.TableDataMap.prune(TableDataMap.java:122) at org.apache.carbondata.hadoop.api.CarbonInputFormat.getPrunedBlocklets(CarbonInputFormat.java:590) at org.apache.carbondata.hadoop.api.CarbonInputFormat.getDataBlocksOfSegment(CarbonInputFormat.java:503) at org.apache.carbondata.hadoop.api.CarbonTableInputFormat.getSplits(CarbonTableInputFormat.java:461) at org.apache.carbondata.hadoop.api.CarbonTableInputFormat.getSplits(CarbonTableInputFormat.java:197) at org.apache.carbondata.spark.rdd.CarbonMergerRDD$$anonfun$internalGetPartitions$3.apply(CarbonMergerRDD.scala:382) at org.apache.carbondata.spark.rdd.CarbonMergerRDD$$anonfun$internalGetPartitions$3.apply(CarbonMergerRDD.scala:363) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at org.apache.carbondata.spark.rdd.CarbonMergerRDD.internalGetPartitions(CarbonMergerRDD.scala:363) at org.apache.carbondata.spark.rdd.CarbonRDD.getPartitions(CarbonRDD.scala:68) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:251) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099) at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) at org.apache.spark.rdd.RDD.collect(RDD.scala:944) at org.apache.carbondata.spark.rdd.CarbonTableCompactor.triggerCompaction(CarbonTableCompactor.scala:204) at org.apache.carbondata.spark.rdd.CarbonTableCompactor.scanSegmentsAndSubmitJob(CarbonTableCompactor.scala:136) at org.apache.carbondata.spark.rdd.CarbonTableCompactor.executeCompaction(CarbonTableCompactor.scala:85) at org.apache.carbondata.spark.rdd.CarbonDataRDDFactory$$anon$2.run(CarbonDataRDDFactory.scala:180) at org.apache.carbondata.spark.rdd.CarbonDataRDDFactory$.startCompactionThreads(CarbonDataRDDFactory.scala:287) at org.apache.spark.sql.execution.command.management.CarbonAlterTableCompactionCommand.alterTableForCompaction(CarbonAlterTableCompactionCommand.scala:338) at org.apache.spark.sql.execution.command.management.CarbonAlterTableCompactionCommand.processData(CarbonAlterTableCompactionCommand.scala:202) at org.apache.spark.sql.execution.command.AtomicRunnableCommand$$anonfun$run$3.apply(package.scala:148) at org.apache.spark.sql.execution.command.AtomicRunnableCommand$$anonfun$run$3.apply(package.scala:145) at org.apache.spark.sql.execution.command.Auditable$class.runWithAudit(package.scala:104) at org.apache.spark.sql.execution.command.AtomicRunnableCommand.runWithAudit(package.scala:141) at org.apache.spark.sql.execution.command.AtomicRunnableCommand.run(package.scala:145) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79) at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190) at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190) at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3259) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3258) at org.apache.spark.sql.Dataset.<init>(Dataset.scala:190) at org.apache.spark.sql.CarbonSession$$anonfun$sql$1.apply(CarbonSession.scala:91) at org.apache.spark.sql.CarbonSession$$anonfun$sql$1.apply(CarbonSession.scala:90) at org.apache.spark.sql.CarbonSession.withProfiler(CarbonSession.scala:136) at org.apache.spark.sql.CarbonSession.sql(CarbonSession.scala:88)