[GitHub] carbondata pull request #2273: [CARBONDATA-2442] Fixed: Reading two sdk writ...

2018-05-09 Thread ajantha-bhat
Github user ajantha-bhat commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2273#discussion_r187136017
  
--- Diff: 
hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
 ---
@@ -151,6 +154,33 @@ public CarbonTable 
getOrCreateCarbonTable(Configuration configuration) throws IO
 SegmentStatusManager segmentStatusManager = new 
SegmentStatusManager(identifier);
 SegmentStatusManager.ValidAndInvalidSegmentsInfo segments = 
segmentStatusManager
 .getValidAndInvalidSegments(loadMetadataDetails, 
this.readCommittedScope);
+
+// For NonTransactional table, compare the schema of all index files 
with inferred schema.
+// If there is a mismatch throw exception. As all files must be of 
same schema.
+if (!carbonTable.getTableInfo().isTransactionalTable()) {
+  SchemaConverter schemaConverter = new 
ThriftWrapperSchemaConverterImpl();
+  for (Segment segment : segments.getValidSegments()) {
+Map indexFiles = segment.getCommittedIndexFile();
+for (Map.Entry indexFileEntry : 
indexFiles.entrySet()) {
+  Path indexFile = new Path(indexFileEntry.getKey());
+  org.apache.carbondata.format.TableInfo tableInfo = 
CarbonUtil.inferSchemaFromIndexFile(
+  indexFile.toString(), carbonTable.getTableName());
+  TableInfo wrapperTableInfo = 
schemaConverter.fromExternalToWrapperTableInfo(
+  tableInfo, identifier.getDatabaseName(),
+  identifier.getTableName(),
+  identifier.getTablePath());
+  List indexFileColumnList =
+  wrapperTableInfo.getFactTable().getListOfColumns();
+  List tableColumnList =
+  carbonTable.getTableInfo().getFactTable().getListOfColumns();
+  if (!compareColumnSchemaList(indexFileColumnList, 
tableColumnList)) {
+throw new IOException("All the files schema doesn't match. "
--- End diff --

@kumarvishal09 :Tested with parquet by having 2 files with same column name 
but different data type. parquet throws java.lang.UnsupportedOperationException 
during read.

Caused by: java.lang.UnsupportedOperationException: Unimplemented type: 
StringType
at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readIntBatch(VectorizedColumnReader.java:369)
at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:188)
at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:230)
at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:137)
at 
org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:105)
at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)
at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:105)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)





---


[GitHub] carbondata pull request #2273: [CARBONDATA-2442] Fixed: Reading two sdk writ...

2018-05-09 Thread kumarvishal09
Github user kumarvishal09 commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2273#discussion_r187116998
  
--- Diff: 
hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
 ---
@@ -151,6 +154,33 @@ public CarbonTable 
getOrCreateCarbonTable(Configuration configuration) throws IO
 SegmentStatusManager segmentStatusManager = new 
SegmentStatusManager(identifier);
 SegmentStatusManager.ValidAndInvalidSegmentsInfo segments = 
segmentStatusManager
 .getValidAndInvalidSegments(loadMetadataDetails, 
this.readCommittedScope);
+
+// For NonTransactional table, compare the schema of all index files 
with inferred schema.
+// If there is a mismatch throw exception. As all files must be of 
same schema.
+if (!carbonTable.getTableInfo().isTransactionalTable()) {
+  SchemaConverter schemaConverter = new 
ThriftWrapperSchemaConverterImpl();
+  for (Segment segment : segments.getValidSegments()) {
+Map indexFiles = segment.getCommittedIndexFile();
+for (Map.Entry indexFileEntry : 
indexFiles.entrySet()) {
+  Path indexFile = new Path(indexFileEntry.getKey());
+  org.apache.carbondata.format.TableInfo tableInfo = 
CarbonUtil.inferSchemaFromIndexFile(
+  indexFile.toString(), carbonTable.getTableName());
+  TableInfo wrapperTableInfo = 
schemaConverter.fromExternalToWrapperTableInfo(
+  tableInfo, identifier.getDatabaseName(),
+  identifier.getTableName(),
+  identifier.getTablePath());
+  List indexFileColumnList =
+  wrapperTableInfo.getFactTable().getListOfColumns();
+  List tableColumnList =
+  carbonTable.getTableInfo().getFactTable().getListOfColumns();
+  if (!compareColumnSchemaList(indexFileColumnList, 
tableColumnList)) {
+throw new IOException("All the files schema doesn't match. "
--- End diff --

@kunal642 @sounakr I agree with @gvramana,  skipping data file is not 
correct as it will miss some records which will not be acceptable. Blocking 
user while writing is not possible. I think throwing exception is correct. 
@ajantha-bhat Can u please check how Parquet works in similar scenario.


---


[GitHub] carbondata pull request #2273: [CARBONDATA-2442] Fixed: Reading two sdk writ...

2018-05-09 Thread gvramana
Github user gvramana commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2273#discussion_r187071062
  
--- Diff: 
hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
 ---
@@ -151,6 +154,33 @@ public CarbonTable 
getOrCreateCarbonTable(Configuration configuration) throws IO
 SegmentStatusManager segmentStatusManager = new 
SegmentStatusManager(identifier);
 SegmentStatusManager.ValidAndInvalidSegmentsInfo segments = 
segmentStatusManager
 .getValidAndInvalidSegments(loadMetadataDetails, 
this.readCommittedScope);
+
+// For NonTransactional table, compare the schema of all index files 
with inferred schema.
+// If there is a mismatch throw exception. As all files must be of 
same schema.
+if (!carbonTable.getTableInfo().isTransactionalTable()) {
+  SchemaConverter schemaConverter = new 
ThriftWrapperSchemaConverterImpl();
+  for (Segment segment : segments.getValidSegments()) {
+Map indexFiles = segment.getCommittedIndexFile();
+for (Map.Entry indexFileEntry : 
indexFiles.entrySet()) {
+  Path indexFile = new Path(indexFileEntry.getKey());
+  org.apache.carbondata.format.TableInfo tableInfo = 
CarbonUtil.inferSchemaFromIndexFile(
+  indexFile.toString(), carbonTable.getTableName());
+  TableInfo wrapperTableInfo = 
schemaConverter.fromExternalToWrapperTableInfo(
+  tableInfo, identifier.getDatabaseName(),
+  identifier.getTableName(),
+  identifier.getTablePath());
+  List indexFileColumnList =
+  wrapperTableInfo.getFactTable().getListOfColumns();
+  List tableColumnList =
+  carbonTable.getTableInfo().getFactTable().getListOfColumns();
+  if (!compareColumnSchemaList(indexFileColumnList, 
tableColumnList)) {
+throw new IOException("All the files schema doesn't match. "
--- End diff --

@kunal642 , @sounakr. Data files should not be skipped, clear error should 
be given to user. Otherwise user thinks that result is correct and is computed 
considering all files. Along with exception, which file has data mismatch also 
needs to be logged for him to analyse further and fix.
later carbon print tool will be provided for him to check schema of each 
carbondata file, which will help user to debug problem.


---


[GitHub] carbondata pull request #2273: [CARBONDATA-2442] Fixed: Reading two sdk writ...

2018-05-09 Thread sounakr
Github user sounakr commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2273#discussion_r187030343
  
--- Diff: 
hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
 ---
@@ -151,6 +154,33 @@ public CarbonTable 
getOrCreateCarbonTable(Configuration configuration) throws IO
 SegmentStatusManager segmentStatusManager = new 
SegmentStatusManager(identifier);
 SegmentStatusManager.ValidAndInvalidSegmentsInfo segments = 
segmentStatusManager
 .getValidAndInvalidSegments(loadMetadataDetails, 
this.readCommittedScope);
+
+// For NonTransactional table, compare the schema of all index files 
with inferred schema.
+// If there is a mismatch throw exception. As all files must be of 
same schema.
+if (!carbonTable.getTableInfo().isTransactionalTable()) {
+  SchemaConverter schemaConverter = new 
ThriftWrapperSchemaConverterImpl();
+  for (Segment segment : segments.getValidSegments()) {
+Map indexFiles = segment.getCommittedIndexFile();
+for (Map.Entry indexFileEntry : 
indexFiles.entrySet()) {
+  Path indexFile = new Path(indexFileEntry.getKey());
+  org.apache.carbondata.format.TableInfo tableInfo = 
CarbonUtil.inferSchemaFromIndexFile(
+  indexFile.toString(), carbonTable.getTableName());
+  TableInfo wrapperTableInfo = 
schemaConverter.fromExternalToWrapperTableInfo(
+  tableInfo, identifier.getDatabaseName(),
+  identifier.getTableName(),
+  identifier.getTablePath());
+  List indexFileColumnList =
+  wrapperTableInfo.getFactTable().getListOfColumns();
+  List tableColumnList =
+  carbonTable.getTableInfo().getFactTable().getListOfColumns();
+  if (!compareColumnSchemaList(indexFileColumnList, 
tableColumnList)) {
+throw new IOException("All the files schema doesn't match. "
--- End diff --

@kunal642 I agree with you. The purpose of SDK is to read whatever file is 
present. In case there is a mismatch in the schema we should not block the 
output of the files are having correct schema. 
Also, in future we are going to support Merge Schema and show the output in 
case of different schema.  
Better to show the output of how much can be read with the correct schema 
and also throw a warning or print the log for the presence of different schema 
in the log. 


---


[GitHub] carbondata pull request #2273: [CARBONDATA-2442] Fixed: Reading two sdk writ...

2018-05-09 Thread ajantha-bhat
Github user ajantha-bhat commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2273#discussion_r187007180
  
--- Diff: 
hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
 ---
@@ -151,6 +154,33 @@ public CarbonTable 
getOrCreateCarbonTable(Configuration configuration) throws IO
 SegmentStatusManager segmentStatusManager = new 
SegmentStatusManager(identifier);
 SegmentStatusManager.ValidAndInvalidSegmentsInfo segments = 
segmentStatusManager
 .getValidAndInvalidSegments(loadMetadataDetails, 
this.readCommittedScope);
+
+// For NonTransactional table, compare the schema of all index files 
with inferred schema.
+// If there is a mismatch throw exception. As all files must be of 
same schema.
+if (!carbonTable.getTableInfo().isTransactionalTable()) {
+  SchemaConverter schemaConverter = new 
ThriftWrapperSchemaConverterImpl();
+  for (Segment segment : segments.getValidSegments()) {
+Map indexFiles = segment.getCommittedIndexFile();
+for (Map.Entry indexFileEntry : 
indexFiles.entrySet()) {
+  Path indexFile = new Path(indexFileEntry.getKey());
+  org.apache.carbondata.format.TableInfo tableInfo = 
CarbonUtil.inferSchemaFromIndexFile(
+  indexFile.toString(), carbonTable.getTableName());
+  TableInfo wrapperTableInfo = 
schemaConverter.fromExternalToWrapperTableInfo(
+  tableInfo, identifier.getDatabaseName(),
+  identifier.getTableName(),
+  identifier.getTablePath());
+  List indexFileColumnList =
+  wrapperTableInfo.getFactTable().getListOfColumns();
+  List tableColumnList =
+  carbonTable.getTableInfo().getFactTable().getListOfColumns();
+  if (!compareColumnSchemaList(indexFileColumnList, 
tableColumnList)) {
+throw new IOException("All the files schema doesn't match. "
--- End diff --

@kunal642 : For nonTransactional tables, we support many sdk writers output 
files to be placed and read from same folder. This works when schema is same, 
If schema is different we have to inform user that these files are not of same 
type. If we just ignore fiels how user know why it is ignored ? hence the 
exception


---


[GitHub] carbondata pull request #2273: [CARBONDATA-2442] Fixed: Reading two sdk writ...

2018-05-09 Thread kunal642
Github user kunal642 commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2273#discussion_r186995311
  
--- Diff: 
core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java ---
@@ -2311,14 +2312,14 @@ static DataType thriftDataTyopeToWrapperDataType(
 }
   }
 
-  public static List getFilePathExternalFilePath(String path) {
+  public static List getFilePathExternalFilePath(String path, 
final String fileExtension) {
--- End diff --

This change is not required


---


[GitHub] carbondata pull request #2273: [CARBONDATA-2442] Fixed: Reading two sdk writ...

2018-05-09 Thread kunal642
Github user kunal642 commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2273#discussion_r186992244
  
--- Diff: 
hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
 ---
@@ -151,6 +154,33 @@ public CarbonTable 
getOrCreateCarbonTable(Configuration configuration) throws IO
 SegmentStatusManager segmentStatusManager = new 
SegmentStatusManager(identifier);
 SegmentStatusManager.ValidAndInvalidSegmentsInfo segments = 
segmentStatusManager
 .getValidAndInvalidSegments(loadMetadataDetails, 
this.readCommittedScope);
+
+// For NonTransactional table, compare the schema of all index files 
with inferred schema.
+// If there is a mismatch throw exception. As all files must be of 
same schema.
+if (!carbonTable.getTableInfo().isTransactionalTable()) {
+  SchemaConverter schemaConverter = new 
ThriftWrapperSchemaConverterImpl();
+  for (Segment segment : segments.getValidSegments()) {
+Map indexFiles = segment.getCommittedIndexFile();
+for (Map.Entry indexFileEntry : 
indexFiles.entrySet()) {
+  Path indexFile = new Path(indexFileEntry.getKey());
+  org.apache.carbondata.format.TableInfo tableInfo = 
CarbonUtil.inferSchemaFromIndexFile(
+  indexFile.toString(), carbonTable.getTableName());
+  TableInfo wrapperTableInfo = 
schemaConverter.fromExternalToWrapperTableInfo(
+  tableInfo, identifier.getDatabaseName(),
+  identifier.getTableName(),
+  identifier.getTablePath());
+  List indexFileColumnList =
+  wrapperTableInfo.getFactTable().getListOfColumns();
+  List tableColumnList =
+  carbonTable.getTableInfo().getFactTable().getListOfColumns();
+  if (!compareColumnSchemaList(indexFileColumnList, 
tableColumnList)) {
+throw new IOException("All the files schema doesn't match. "
--- End diff --

I dont think throwing exception is the correct approach. Either we should 
not let the user write data with different schema or we should infer the schema 
from the first index file that was written and skip any index file with 
different schema while preparing splits.

@sounakr @kumarvishal09 what do you think??


---


[GitHub] carbondata pull request #2273: [CARBONDATA-2442] Fixed: Reading two sdk writ...

2018-05-09 Thread kunal642
Github user kunal642 commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2273#discussion_r186992068
  
--- Diff: 
hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
 ---
@@ -151,6 +154,33 @@ public CarbonTable 
getOrCreateCarbonTable(Configuration configuration) throws IO
 SegmentStatusManager segmentStatusManager = new 
SegmentStatusManager(identifier);
 SegmentStatusManager.ValidAndInvalidSegmentsInfo segments = 
segmentStatusManager
 .getValidAndInvalidSegments(loadMetadataDetails, 
this.readCommittedScope);
+
+// For NonTransactional table, compare the schema of all index files 
with inferred schema.
+// If there is a mismatch throw exception. As all files must be of 
same schema.
+if (!carbonTable.getTableInfo().isTransactionalTable()) {
+  SchemaConverter schemaConverter = new 
ThriftWrapperSchemaConverterImpl();
+  for (Segment segment : segments.getValidSegments()) {
+Map indexFiles = segment.getCommittedIndexFile();
+for (Map.Entry indexFileEntry : 
indexFiles.entrySet()) {
+  Path indexFile = new Path(indexFileEntry.getKey());
+  org.apache.carbondata.format.TableInfo tableInfo = 
CarbonUtil.inferSchemaFromIndexFile(
+  indexFile.toString(), carbonTable.getTableName());
+  TableInfo wrapperTableInfo = 
schemaConverter.fromExternalToWrapperTableInfo(
+  tableInfo, identifier.getDatabaseName(),
+  identifier.getTableName(),
+  identifier.getTablePath());
+  List indexFileColumnList =
+  wrapperTableInfo.getFactTable().getListOfColumns();
+  List tableColumnList =
+  carbonTable.getTableInfo().getFactTable().getListOfColumns();
+  if (!compareColumnSchemaList(indexFileColumnList, 
tableColumnList)) {
+throw new IOException("All the files schema doesn't match. "
--- End diff --

I dont think throwing exception is the correct approach. Either we should 
not let the user write data with different schema or we should infer the schema 
from the first index file that was written and skip any index file with 
different schema while preparing splits.


---


[GitHub] carbondata pull request #2273: [CARBONDATA-2442] Fixed: Reading two sdk writ...

2018-05-08 Thread ajantha-bhat
Github user ajantha-bhat commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2273#discussion_r186725643
  
--- Diff: 
core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
 ---
@@ -342,6 +342,30 @@ public void setParentColumnTableRelations(
 return true;
   }
 
+  /**
+   * method to compare columnSchema,
+   * other parameters along with just column name and column data type
+   * @param obj
+   * @return
+   */
+  public boolean equalsWithStrictCheck(Object obj) {
+if (!this.equals(obj)) {
+  return false;
+}
+ColumnSchema other = (ColumnSchema) obj;
+if (!columnUniqueId.equals(other.columnUniqueId) ||
+(isDimensionColumn != other.isDimensionColumn) ||
+(scale != other.scale) ||
+(precision != other.precision) ||
+(isSortColumn != other.isSortColumn)) {
+  return false;
+}
+if (encodingList.size() != other.encodingList.size()) {
--- End diff --

done. added


---


[GitHub] carbondata pull request #2273: [CARBONDATA-2442] Fixed: Reading two sdk writ...

2018-05-08 Thread kumarvishal09
Github user kumarvishal09 commented on a diff in the pull request:

https://github.com/apache/carbondata/pull/2273#discussion_r186696437
  
--- Diff: 
core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
 ---
@@ -342,6 +342,30 @@ public void setParentColumnTableRelations(
 return true;
   }
 
+  /**
+   * method to compare columnSchema,
+   * other parameters along with just column name and column data type
+   * @param obj
+   * @return
+   */
+  public boolean equalsWithStrictCheck(Object obj) {
+if (!this.equals(obj)) {
+  return false;
+}
+ColumnSchema other = (ColumnSchema) obj;
+if (!columnUniqueId.equals(other.columnUniqueId) ||
+(isDimensionColumn != other.isDimensionColumn) ||
+(scale != other.scale) ||
+(precision != other.precision) ||
+(isSortColumn != other.isSortColumn)) {
+  return false;
+}
+if (encodingList.size() != other.encodingList.size()) {
--- End diff --

Better to check the encoding values also...this is a generic method and can 
be useful for other schenario


---