[GitHub] carbondata pull request #2273: [CARBONDATA-2442] Fixed: Reading two sdk writ...
Github user ajantha-bhat commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2273#discussion_r187136017 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java --- @@ -151,6 +154,33 @@ public CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IO SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier); SegmentStatusManager.ValidAndInvalidSegmentsInfo segments = segmentStatusManager .getValidAndInvalidSegments(loadMetadataDetails, this.readCommittedScope); + +// For NonTransactional table, compare the schema of all index files with inferred schema. +// If there is a mismatch throw exception. As all files must be of same schema. +if (!carbonTable.getTableInfo().isTransactionalTable()) { + SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl(); + for (Segment segment : segments.getValidSegments()) { +Map indexFiles = segment.getCommittedIndexFile(); +for (Map.Entry indexFileEntry : indexFiles.entrySet()) { + Path indexFile = new Path(indexFileEntry.getKey()); + org.apache.carbondata.format.TableInfo tableInfo = CarbonUtil.inferSchemaFromIndexFile( + indexFile.toString(), carbonTable.getTableName()); + TableInfo wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo( + tableInfo, identifier.getDatabaseName(), + identifier.getTableName(), + identifier.getTablePath()); + List indexFileColumnList = + wrapperTableInfo.getFactTable().getListOfColumns(); + List tableColumnList = + carbonTable.getTableInfo().getFactTable().getListOfColumns(); + if (!compareColumnSchemaList(indexFileColumnList, tableColumnList)) { +throw new IOException("All the files schema doesn't match. " --- End diff -- @kumarvishal09 :Tested with parquet by having 2 files with same column name but different data type. parquet throws java.lang.UnsupportedOperationException during read. Caused by: java.lang.UnsupportedOperationException: Unimplemented type: StringType at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readIntBatch(VectorizedColumnReader.java:369) at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:188) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:230) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:137) at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:105) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:105) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ---
[GitHub] carbondata pull request #2273: [CARBONDATA-2442] Fixed: Reading two sdk writ...
Github user kumarvishal09 commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2273#discussion_r187116998 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java --- @@ -151,6 +154,33 @@ public CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IO SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier); SegmentStatusManager.ValidAndInvalidSegmentsInfo segments = segmentStatusManager .getValidAndInvalidSegments(loadMetadataDetails, this.readCommittedScope); + +// For NonTransactional table, compare the schema of all index files with inferred schema. +// If there is a mismatch throw exception. As all files must be of same schema. +if (!carbonTable.getTableInfo().isTransactionalTable()) { + SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl(); + for (Segment segment : segments.getValidSegments()) { +Map indexFiles = segment.getCommittedIndexFile(); +for (Map.Entry indexFileEntry : indexFiles.entrySet()) { + Path indexFile = new Path(indexFileEntry.getKey()); + org.apache.carbondata.format.TableInfo tableInfo = CarbonUtil.inferSchemaFromIndexFile( + indexFile.toString(), carbonTable.getTableName()); + TableInfo wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo( + tableInfo, identifier.getDatabaseName(), + identifier.getTableName(), + identifier.getTablePath()); + List indexFileColumnList = + wrapperTableInfo.getFactTable().getListOfColumns(); + List tableColumnList = + carbonTable.getTableInfo().getFactTable().getListOfColumns(); + if (!compareColumnSchemaList(indexFileColumnList, tableColumnList)) { +throw new IOException("All the files schema doesn't match. " --- End diff -- @kunal642 @sounakr I agree with @gvramana, skipping data file is not correct as it will miss some records which will not be acceptable. Blocking user while writing is not possible. I think throwing exception is correct. @ajantha-bhat Can u please check how Parquet works in similar scenario. ---
[GitHub] carbondata pull request #2273: [CARBONDATA-2442] Fixed: Reading two sdk writ...
Github user gvramana commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2273#discussion_r187071062 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java --- @@ -151,6 +154,33 @@ public CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IO SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier); SegmentStatusManager.ValidAndInvalidSegmentsInfo segments = segmentStatusManager .getValidAndInvalidSegments(loadMetadataDetails, this.readCommittedScope); + +// For NonTransactional table, compare the schema of all index files with inferred schema. +// If there is a mismatch throw exception. As all files must be of same schema. +if (!carbonTable.getTableInfo().isTransactionalTable()) { + SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl(); + for (Segment segment : segments.getValidSegments()) { +Map indexFiles = segment.getCommittedIndexFile(); +for (Map.Entry indexFileEntry : indexFiles.entrySet()) { + Path indexFile = new Path(indexFileEntry.getKey()); + org.apache.carbondata.format.TableInfo tableInfo = CarbonUtil.inferSchemaFromIndexFile( + indexFile.toString(), carbonTable.getTableName()); + TableInfo wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo( + tableInfo, identifier.getDatabaseName(), + identifier.getTableName(), + identifier.getTablePath()); + List indexFileColumnList = + wrapperTableInfo.getFactTable().getListOfColumns(); + List tableColumnList = + carbonTable.getTableInfo().getFactTable().getListOfColumns(); + if (!compareColumnSchemaList(indexFileColumnList, tableColumnList)) { +throw new IOException("All the files schema doesn't match. " --- End diff -- @kunal642 , @sounakr. Data files should not be skipped, clear error should be given to user. Otherwise user thinks that result is correct and is computed considering all files. Along with exception, which file has data mismatch also needs to be logged for him to analyse further and fix. later carbon print tool will be provided for him to check schema of each carbondata file, which will help user to debug problem. ---
[GitHub] carbondata pull request #2273: [CARBONDATA-2442] Fixed: Reading two sdk writ...
Github user sounakr commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2273#discussion_r187030343 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java --- @@ -151,6 +154,33 @@ public CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IO SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier); SegmentStatusManager.ValidAndInvalidSegmentsInfo segments = segmentStatusManager .getValidAndInvalidSegments(loadMetadataDetails, this.readCommittedScope); + +// For NonTransactional table, compare the schema of all index files with inferred schema. +// If there is a mismatch throw exception. As all files must be of same schema. +if (!carbonTable.getTableInfo().isTransactionalTable()) { + SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl(); + for (Segment segment : segments.getValidSegments()) { +Map indexFiles = segment.getCommittedIndexFile(); +for (Map.Entry indexFileEntry : indexFiles.entrySet()) { + Path indexFile = new Path(indexFileEntry.getKey()); + org.apache.carbondata.format.TableInfo tableInfo = CarbonUtil.inferSchemaFromIndexFile( + indexFile.toString(), carbonTable.getTableName()); + TableInfo wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo( + tableInfo, identifier.getDatabaseName(), + identifier.getTableName(), + identifier.getTablePath()); + List indexFileColumnList = + wrapperTableInfo.getFactTable().getListOfColumns(); + List tableColumnList = + carbonTable.getTableInfo().getFactTable().getListOfColumns(); + if (!compareColumnSchemaList(indexFileColumnList, tableColumnList)) { +throw new IOException("All the files schema doesn't match. " --- End diff -- @kunal642 I agree with you. The purpose of SDK is to read whatever file is present. In case there is a mismatch in the schema we should not block the output of the files are having correct schema. Also, in future we are going to support Merge Schema and show the output in case of different schema. Better to show the output of how much can be read with the correct schema and also throw a warning or print the log for the presence of different schema in the log. ---
[GitHub] carbondata pull request #2273: [CARBONDATA-2442] Fixed: Reading two sdk writ...
Github user ajantha-bhat commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2273#discussion_r187007180 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java --- @@ -151,6 +154,33 @@ public CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IO SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier); SegmentStatusManager.ValidAndInvalidSegmentsInfo segments = segmentStatusManager .getValidAndInvalidSegments(loadMetadataDetails, this.readCommittedScope); + +// For NonTransactional table, compare the schema of all index files with inferred schema. +// If there is a mismatch throw exception. As all files must be of same schema. +if (!carbonTable.getTableInfo().isTransactionalTable()) { + SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl(); + for (Segment segment : segments.getValidSegments()) { +Map indexFiles = segment.getCommittedIndexFile(); +for (Map.Entry indexFileEntry : indexFiles.entrySet()) { + Path indexFile = new Path(indexFileEntry.getKey()); + org.apache.carbondata.format.TableInfo tableInfo = CarbonUtil.inferSchemaFromIndexFile( + indexFile.toString(), carbonTable.getTableName()); + TableInfo wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo( + tableInfo, identifier.getDatabaseName(), + identifier.getTableName(), + identifier.getTablePath()); + List indexFileColumnList = + wrapperTableInfo.getFactTable().getListOfColumns(); + List tableColumnList = + carbonTable.getTableInfo().getFactTable().getListOfColumns(); + if (!compareColumnSchemaList(indexFileColumnList, tableColumnList)) { +throw new IOException("All the files schema doesn't match. " --- End diff -- @kunal642 : For nonTransactional tables, we support many sdk writers output files to be placed and read from same folder. This works when schema is same, If schema is different we have to inform user that these files are not of same type. If we just ignore fiels how user know why it is ignored ? hence the exception ---
[GitHub] carbondata pull request #2273: [CARBONDATA-2442] Fixed: Reading two sdk writ...
Github user kunal642 commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2273#discussion_r186995311 --- Diff: core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java --- @@ -2311,14 +2312,14 @@ static DataType thriftDataTyopeToWrapperDataType( } } - public static List getFilePathExternalFilePath(String path) { + public static List getFilePathExternalFilePath(String path, final String fileExtension) { --- End diff -- This change is not required ---
[GitHub] carbondata pull request #2273: [CARBONDATA-2442] Fixed: Reading two sdk writ...
Github user kunal642 commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2273#discussion_r186992244 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java --- @@ -151,6 +154,33 @@ public CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IO SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier); SegmentStatusManager.ValidAndInvalidSegmentsInfo segments = segmentStatusManager .getValidAndInvalidSegments(loadMetadataDetails, this.readCommittedScope); + +// For NonTransactional table, compare the schema of all index files with inferred schema. +// If there is a mismatch throw exception. As all files must be of same schema. +if (!carbonTable.getTableInfo().isTransactionalTable()) { + SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl(); + for (Segment segment : segments.getValidSegments()) { +Map indexFiles = segment.getCommittedIndexFile(); +for (Map.Entry indexFileEntry : indexFiles.entrySet()) { + Path indexFile = new Path(indexFileEntry.getKey()); + org.apache.carbondata.format.TableInfo tableInfo = CarbonUtil.inferSchemaFromIndexFile( + indexFile.toString(), carbonTable.getTableName()); + TableInfo wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo( + tableInfo, identifier.getDatabaseName(), + identifier.getTableName(), + identifier.getTablePath()); + List indexFileColumnList = + wrapperTableInfo.getFactTable().getListOfColumns(); + List tableColumnList = + carbonTable.getTableInfo().getFactTable().getListOfColumns(); + if (!compareColumnSchemaList(indexFileColumnList, tableColumnList)) { +throw new IOException("All the files schema doesn't match. " --- End diff -- I dont think throwing exception is the correct approach. Either we should not let the user write data with different schema or we should infer the schema from the first index file that was written and skip any index file with different schema while preparing splits. @sounakr @kumarvishal09 what do you think?? ---
[GitHub] carbondata pull request #2273: [CARBONDATA-2442] Fixed: Reading two sdk writ...
Github user kunal642 commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2273#discussion_r186992068 --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java --- @@ -151,6 +154,33 @@ public CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IO SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier); SegmentStatusManager.ValidAndInvalidSegmentsInfo segments = segmentStatusManager .getValidAndInvalidSegments(loadMetadataDetails, this.readCommittedScope); + +// For NonTransactional table, compare the schema of all index files with inferred schema. +// If there is a mismatch throw exception. As all files must be of same schema. +if (!carbonTable.getTableInfo().isTransactionalTable()) { + SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl(); + for (Segment segment : segments.getValidSegments()) { +Map indexFiles = segment.getCommittedIndexFile(); +for (Map.Entry indexFileEntry : indexFiles.entrySet()) { + Path indexFile = new Path(indexFileEntry.getKey()); + org.apache.carbondata.format.TableInfo tableInfo = CarbonUtil.inferSchemaFromIndexFile( + indexFile.toString(), carbonTable.getTableName()); + TableInfo wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo( + tableInfo, identifier.getDatabaseName(), + identifier.getTableName(), + identifier.getTablePath()); + List indexFileColumnList = + wrapperTableInfo.getFactTable().getListOfColumns(); + List tableColumnList = + carbonTable.getTableInfo().getFactTable().getListOfColumns(); + if (!compareColumnSchemaList(indexFileColumnList, tableColumnList)) { +throw new IOException("All the files schema doesn't match. " --- End diff -- I dont think throwing exception is the correct approach. Either we should not let the user write data with different schema or we should infer the schema from the first index file that was written and skip any index file with different schema while preparing splits. ---
[GitHub] carbondata pull request #2273: [CARBONDATA-2442] Fixed: Reading two sdk writ...
Github user ajantha-bhat commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2273#discussion_r186725643 --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java --- @@ -342,6 +342,30 @@ public void setParentColumnTableRelations( return true; } + /** + * method to compare columnSchema, + * other parameters along with just column name and column data type + * @param obj + * @return + */ + public boolean equalsWithStrictCheck(Object obj) { +if (!this.equals(obj)) { + return false; +} +ColumnSchema other = (ColumnSchema) obj; +if (!columnUniqueId.equals(other.columnUniqueId) || +(isDimensionColumn != other.isDimensionColumn) || +(scale != other.scale) || +(precision != other.precision) || +(isSortColumn != other.isSortColumn)) { + return false; +} +if (encodingList.size() != other.encodingList.size()) { --- End diff -- done. added ---
[GitHub] carbondata pull request #2273: [CARBONDATA-2442] Fixed: Reading two sdk writ...
Github user kumarvishal09 commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2273#discussion_r186696437 --- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java --- @@ -342,6 +342,30 @@ public void setParentColumnTableRelations( return true; } + /** + * method to compare columnSchema, + * other parameters along with just column name and column data type + * @param obj + * @return + */ + public boolean equalsWithStrictCheck(Object obj) { +if (!this.equals(obj)) { + return false; +} +ColumnSchema other = (ColumnSchema) obj; +if (!columnUniqueId.equals(other.columnUniqueId) || +(isDimensionColumn != other.isDimensionColumn) || +(scale != other.scale) || +(precision != other.precision) || +(isSortColumn != other.isSortColumn)) { + return false; +} +if (encodingList.size() != other.encodingList.size()) { --- End diff -- Better to check the encoding values also...this is a generic method and can be useful for other schenario ---