[CARBONDATA-1114][Tests] Fix bugs in tests in windows env Fix bugs in tests that will cause failure under windows env
This closes #1994 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/859d71c1 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/859d71c1 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/859d71c1 Branch: refs/heads/master Commit: 859d71c1737b6287c4f1a06f7cd9055d32ff8a99 Parents: d5396b1 Author: xuchuanyin <xuchuan...@hust.edu.cn> Authored: Sat Feb 24 21:18:17 2018 +0800 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Thu Mar 8 22:21:11 2018 +0530 ---------------------------------------------------------------------- .../carbondata/core/datamap/dev/DataMap.java | 6 - .../core/datamap/dev/DataMapFactory.java | 2 +- .../exception/ConcurrentOperationException.java | 16 +- .../carbondata/core/locks/LocalFileLock.java | 30 ++-- .../core/metadata/PartitionMapFileStore.java | 0 .../statusmanager/SegmentStatusManager.java | 10 +- .../SegmentUpdateStatusManager.java | 1 - .../store/impl/DFSFileReaderImplUnitTest.java | 11 +- .../store/impl/FileFactoryImplUnitTest.java | 28 +++- .../filesystem/HDFSCarbonFileTest.java | 3 +- .../filesystem/LocalCarbonFileTest.java | 20 ++- datamap/examples/pom.xml | 145 +++++++---------- .../datamap/examples/MinMaxDataWriter.java | 1 - examples/flink/pom.xml | 4 +- .../carbondata/examples/FlinkExample.scala | 10 +- .../CarbonStreamSparkStreamingExample.scala | 1 - .../hadoop/api/CarbonTableInputFormat.java | 5 +- .../TestInsertAndOtherCommandConcurrent.scala | 2 +- .../StandardPartitionGlobalSortTestCase.scala | 2 +- .../exception/ProcessMetaDataException.java | 2 + .../org/apache/carbondata/api/CarbonStore.scala | 6 +- .../carbondata/spark/load/CsvRDDHelper.scala | 157 +++++++++++++++++++ .../load/DataLoadProcessBuilderOnSpark.scala | 3 +- .../carbondata/spark/util/CarbonScalaUtil.scala | 2 +- .../carbondata/spark/util/CommonUtil.scala | 2 - .../command/carbonTableSchemaCommon.scala | 6 +- .../CarbonAlterTableCompactionCommand.scala | 3 +- .../management/CarbonCleanFilesCommand.scala | 2 +- .../CarbonDeleteLoadByIdCommand.scala | 2 +- .../CarbonDeleteLoadByLoadDateCommand.scala | 2 +- .../management/CarbonLoadDataCommand.scala | 28 ++-- .../CarbonProjectForDeleteCommand.scala | 2 +- .../CarbonProjectForUpdateCommand.scala | 2 +- .../schema/CarbonAlterTableRenameCommand.scala | 2 +- .../command/table/CarbonDropTableCommand.scala | 2 +- .../datasources/CarbonFileFormat.scala | 3 - .../BooleanDataTypesInsertTest.scala | 5 +- .../vectorreader/AddColumnTestCases.scala | 1 + .../datamap/DataMapWriterListener.java | 3 +- .../loading/model/CarbonLoadModelBuilder.java | 34 +++- .../processing/loading/model/LoadOption.java | 15 +- .../processing/merger/CarbonDataMergerUtil.java | 3 +- .../util/CarbonDataProcessorUtil.java | 3 +- .../processing/util/CarbonLoaderUtil.java | 8 + .../carbondata/lcm/locks/LocalFileLockTest.java | 2 +- .../loading/csvinput/CSVInputFormatTest.java | 1 + store/sdk/pom.xml | 2 +- .../carbondata/sdk/file/CSVCarbonWriter.java | 8 +- 48 files changed, 400 insertions(+), 208 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java index 02db8af..dd5507c 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java @@ -38,9 +38,6 @@ public interface DataMap<T extends Blocklet> { /** * Prune the datamap with filter expression and partition information. It returns the list of * blocklets where these filters can exist. - * - * @param filterExp - * @return */ List<T> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties, List<PartitionSpec> partitions); @@ -48,9 +45,6 @@ public interface DataMap<T extends Blocklet> { // TODO Move this method to Abstract class /** * Validate whether the current segment needs to be fetching the required data - * - * @param filterExp - * @return */ boolean isScanRequired(FilterResolverIntf filterExp); http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java index 50ac279..d8a467f 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java @@ -21,8 +21,8 @@ import java.util.List; import org.apache.carbondata.core.datamap.DataMapDistributable; import org.apache.carbondata.core.datamap.DataMapMeta; -import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datamap.DataMapType; +import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.events.Event; http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/core/src/main/java/org/apache/carbondata/core/exception/ConcurrentOperationException.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/exception/ConcurrentOperationException.java b/core/src/main/java/org/apache/carbondata/core/exception/ConcurrentOperationException.java index 7e717ba..918268c 100644 --- a/core/src/main/java/org/apache/carbondata/core/exception/ConcurrentOperationException.java +++ b/core/src/main/java/org/apache/carbondata/core/exception/ConcurrentOperationException.java @@ -17,21 +17,10 @@ package org.apache.carbondata.core.exception; -import org.apache.carbondata.common.annotations.InterfaceAudience; -import org.apache.carbondata.common.annotations.InterfaceStability; - +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; -/** - * This exception will be thrown when executing concurrent operations which - * is not supported in carbon. - * - * For example, when INSERT OVERWRITE is executing, other operations are not - * allowed, so this exception will be thrown - */ -@InterfaceAudience.User -@InterfaceStability.Stable -public class ConcurrentOperationException extends Exception { +public class ConcurrentOperationException extends MalformedCarbonCommandException { public ConcurrentOperationException(String dbName, String tableName, String command1, String command2) { @@ -48,3 +37,4 @@ public class ConcurrentOperationException extends Exception { } } + http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java b/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java index 75ea074..cb80877 100644 --- a/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java +++ b/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java @@ -17,17 +17,20 @@ package org.apache.carbondata.core.locks; -import java.io.FileOutputStream; import java.io.IOException; import java.nio.channels.FileChannel; import java.nio.channels.FileLock; import java.nio.channels.OverlappingFileLockException; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.util.CarbonUtil; /** * This class handles the file locking in the local file system. @@ -40,11 +43,6 @@ public class LocalFileLock extends AbstractCarbonLock { private String location; /** - * fileOutputStream of the local lock file - */ - private FileOutputStream fileOutputStream; - - /** * channel is the FileChannel of the lock file. */ private FileChannel channel; @@ -104,8 +102,8 @@ public class LocalFileLock extends AbstractCarbonLock { FileFactory.createNewLockFile(lockFilePath, FileFactory.getFileType(location)); } - fileOutputStream = new FileOutputStream(lockFilePath); - channel = fileOutputStream.getChannel(); + channel = FileChannel.open(Paths.get(lockFilePath), StandardOpenOption.WRITE, + StandardOpenOption.APPEND); try { fileLock = channel.tryLock(); } catch (OverlappingFileLockException e) { @@ -137,11 +135,17 @@ public class LocalFileLock extends AbstractCarbonLock { } catch (IOException e) { status = false; } finally { - if (null != fileOutputStream) { - try { - fileOutputStream.close(); - } catch (IOException e) { - LOGGER.error(e.getMessage()); + CarbonUtil.closeStreams(channel); + + // deleting the lock file after releasing the lock. + if (null != lockFilePath) { + CarbonFile lockFile = FileFactory.getCarbonFile(lockFilePath, + FileFactory.getFileType(lockFilePath)); + if (!lockFile.exists() || lockFile.delete()) { + LOGGER.info("Successfully deleted the lock file " + lockFilePath); + } else { + LOGGER.error("Not able to delete the lock file " + lockFilePath); + status = false; } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java deleted file mode 100644 index e69de29..0000000 http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java index b3b1240..d76158e 100755 --- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java +++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java @@ -39,6 +39,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.fileoperations.AtomicFileOperations; import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl; import org.apache.carbondata.core.fileoperations.FileWriteOperation; +import org.apache.carbondata.core.indexstore.PartitionSpec; import org.apache.carbondata.core.locks.CarbonLockFactory; import org.apache.carbondata.core.locks.CarbonLockUtil; import org.apache.carbondata.core.locks.ICarbonLock; @@ -837,6 +838,13 @@ public class SegmentStatusManager { public static void deleteLoadsAndUpdateMetadata( CarbonTable carbonTable, boolean isForceDeletion) throws IOException { + deleteLoadsAndUpdateMetadata(carbonTable, isForceDeletion, null); + } + + public static void deleteLoadsAndUpdateMetadata( + CarbonTable carbonTable, + boolean isForceDeletion, + List<PartitionSpec> partitionSpecs) throws IOException { if (isLoadDeletionRequired(carbonTable.getMetadataPath())) { LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath()); @@ -881,7 +889,7 @@ public class SegmentStatusManager { CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK); if (updationCompletionStatus) { DeleteLoadFolders.physicalFactAndMeasureMetadataDeletion( - identifier, carbonTable.getMetadataPath(), isForceDeletion); + identifier, carbonTable.getMetadataPath(), isForceDeletion, partitionSpecs); } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java index 39eb262..a21873d 100644 --- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java +++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java @@ -44,7 +44,6 @@ import org.apache.carbondata.core.locks.CarbonLockFactory; import org.apache.carbondata.core.locks.ICarbonLock; import org.apache.carbondata.core.locks.LockUsage; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; -import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.mutate.CarbonUpdateUtil; import org.apache.carbondata.core.mutate.SegmentUpdateDetails; import org.apache.carbondata.core.mutate.TupleIdEnum; http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/DFSFileReaderImplUnitTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/DFSFileReaderImplUnitTest.java b/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/DFSFileReaderImplUnitTest.java index da61a94..30144c1 100644 --- a/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/DFSFileReaderImplUnitTest.java +++ b/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/DFSFileReaderImplUnitTest.java @@ -38,7 +38,6 @@ import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertThat; public class DFSFileReaderImplUnitTest { - private static DFSFileReaderImpl dfsFileHolder; private static String fileName; private static String fileNameWithEmptyContent; @@ -50,10 +49,8 @@ public class DFSFileReaderImplUnitTest { file = new File("Test.carbondata"); fileWithEmptyContent = new File("TestEXception.carbondata"); - if (!file.exists()) try { - file.createNewFile(); - } catch (IOException e) { - e.printStackTrace(); + if (file.exists()) { + file.delete(); } if (!fileWithEmptyContent.exists()) try { fileWithEmptyContent.createNewFile(); @@ -61,10 +58,12 @@ public class DFSFileReaderImplUnitTest { e.printStackTrace(); } try { - FileOutputStream of = new FileOutputStream(file, true); + FileOutputStream of = new FileOutputStream(file, false); BufferedWriter br = new BufferedWriter(new OutputStreamWriter(of, "UTF-8")); br.write("Hello World"); br.close(); + of.flush(); + of.close(); } catch (Exception e) { e.getMessage(); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/FileFactoryImplUnitTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/FileFactoryImplUnitTest.java b/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/FileFactoryImplUnitTest.java index 65590d6..0e7d1c9 100644 --- a/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/FileFactoryImplUnitTest.java +++ b/core/src/test/java/org/apache/carbondata/core/carbon/datastorage/filesystem/store/impl/FileFactoryImplUnitTest.java @@ -17,6 +17,7 @@ package org.apache.carbondata.core.carbon.datastorage.filesystem.store.impl; +import java.io.DataOutputStream; import java.io.File; import java.io.FileWriter; import java.io.IOException; @@ -43,6 +44,10 @@ public class FileFactoryImplUnitTest { @AfterClass public static void tearDown() { + cleanUp(); + } + + private static void cleanUp() { File file = new File(filePath); if (file.exists()) { file.delete(); @@ -83,17 +88,17 @@ public class FileFactoryImplUnitTest { } @Test public void testCreateNewFileWithDefaultFileType() throws IOException { - tearDown(); + cleanUp(); assertTrue(FileFactory.createNewFile(filePath, FileFactory.FileType.LOCAL)); } @Test public void testCreateNewLockFileWithDefaultFileType() throws IOException { - tearDown(); + cleanUp(); assertTrue(FileFactory.createNewLockFile(filePath, FileFactory.FileType.LOCAL)); } @Test public void testCreateNewLockFileWithViewFsFileType() throws IOException { - tearDown(); + cleanUp(); assertTrue(FileFactory.createNewLockFile(filePath, FileFactory.FileType.VIEWFS)); } @@ -129,20 +134,29 @@ public class FileFactoryImplUnitTest { assertTrue(FileFactory.mkdirs(filePath, FileFactory.FileType.VIEWFS)); } - @Test public void testGetDataOutputStreamUsingAppendeForException() { + @Test public void testGetDataOutputStreamUsingAppendeForException() throws IOException { + DataOutputStream outputStream = null; try { - FileFactory.getDataOutputStreamUsingAppend(filePath, FileFactory.FileType.VIEWFS); + outputStream = FileFactory.getDataOutputStreamUsingAppend(filePath, FileFactory.FileType.VIEWFS); } catch (Exception exception) { assertEquals("Not supported", exception.getMessage()); + } finally { + if (null != outputStream) { + outputStream.close(); + } } } @Test public void getDataOutputStreamForVIEWFSType() throws IOException { - assertNotNull(FileFactory.getDataOutputStream(filePath, FileFactory.FileType.VIEWFS)); + DataOutputStream outputStream = FileFactory.getDataOutputStream(filePath, FileFactory.FileType.VIEWFS); + assertNotNull(outputStream); + outputStream.close(); } @Test public void getDataOutputStreamForLocalType() throws IOException { - assertNotNull(FileFactory.getDataOutputStream(filePath, FileFactory.FileType.LOCAL)); + DataOutputStream outputStream = FileFactory.getDataOutputStream(filePath, FileFactory.FileType.LOCAL); + assertNotNull(outputStream); + outputStream.close(); } @Test public void testGetCarbonFile() throws IOException { http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/core/src/test/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFileTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFileTest.java b/core/src/test/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFileTest.java index 4018123..42d4afa 100644 --- a/core/src/test/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFileTest.java +++ b/core/src/test/java/org/apache/carbondata/core/datastore/filesystem/HDFSCarbonFileTest.java @@ -58,7 +58,7 @@ public class HDFSCarbonFileTest { @BeforeClass static public void setUp() throws IOException { Configuration config = new Configuration(); -//adding local hadoop configuration + // adding local hadoop configuration config.addResource(new Path("core-site.xml")); config.addResource(new Path("hdfs-site.xml")); fileName = "Test.carbondata"; //this path is HDFS path @@ -75,6 +75,7 @@ public class HDFSCarbonFileTest { BufferedWriter br = new BufferedWriter(new OutputStreamWriter(os, "UTF-8")); br.write("Hello World"); br.close(); + os.close(); fs.close(); fileStatus = new FileStatus(12L, true, 60, 120l, 180L, new Path(fileName)); http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/core/src/test/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFileTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFileTest.java b/core/src/test/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFileTest.java index 96ef106..14f9fe2 100644 --- a/core/src/test/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFileTest.java +++ b/core/src/test/java/org/apache/carbondata/core/datastore/filesystem/LocalCarbonFileTest.java @@ -29,6 +29,7 @@ import sun.nio.ch.FileChannelImpl; import java.io.*; import java.nio.channels.ReadableByteChannel; import java.util.Objects; +import java.util.UUID; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; @@ -44,8 +45,8 @@ public class LocalCarbonFileTest { @BeforeClass static public void setUp() { - file = new File("Test.carbondata"); - dir = new File("Testdir.carbondata"); + file = new File("TestLocalCarbonFile"); + dir = new File("TestLocalCarbonDir"); if (!file.exists()) try { file.createNewFile(); @@ -60,6 +61,7 @@ public class LocalCarbonFileTest { byte[] bytes = "core java api".getBytes(); oFile.write(bytes); + oFile.close(); } catch (FileNotFoundException e) { e.printStackTrace(); localCarbonFile = new LocalCarbonFile(file); @@ -121,8 +123,9 @@ public class LocalCarbonFileTest { @Test public void testRenameForce() { localCarbonFile = new LocalCarbonFile(file); - assertTrue(localCarbonFile.renameForce("Testdb.carbon")); - File file1 = new File("Testdb.carbon"); + String destFile = "TestRename" + UUID.randomUUID().toString(); + assertTrue(localCarbonFile.renameForce(destFile)); + File file1 = new File(destFile); if (file1.exists()) { file1.delete(); } @@ -131,7 +134,12 @@ public class LocalCarbonFileTest { @Test public void testRenameTo() { localCarbonFile = new LocalCarbonFile(file); - assertTrue(!localCarbonFile.renameTo("Testdb.carbon")); + String destFile = "TestRename" + UUID.randomUUID().toString(); + assertTrue(!localCarbonFile.renameTo(destFile)); + File file1 = new File(destFile); + if (file1.exists()) { + file1.delete(); + } } @Test @@ -463,6 +471,6 @@ public class LocalCarbonFileTest { localCarbonFile = new LocalCarbonFile("demo.txt"); - assertEquals(localCarbonFile.renameForce("Test.carbondata"), true); + assertEquals(localCarbonFile.renameForce("renameToFile"), true); } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/datamap/examples/pom.xml ---------------------------------------------------------------------- diff --git a/datamap/examples/pom.xml b/datamap/examples/pom.xml index 0049950..8539a86 100644 --- a/datamap/examples/pom.xml +++ b/datamap/examples/pom.xml @@ -15,97 +15,70 @@ See the License for the specific language governing permissions and limitations under the License. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> + <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.carbondata</groupId> - <artifactId>carbondata-parent</artifactId> - <version>1.4.0-SNAPSHOT</version> - <relativePath>../../pom.xml</relativePath> - </parent> + <parent> + <groupId>org.apache.carbondata</groupId> + <artifactId>carbondata-parent</artifactId> + <version>1.4.0-SNAPSHOT</version> + <relativePath>../../pom.xml</relativePath> + </parent> - <artifactId>carbondata-datamap-examples</artifactId> - <name>Apache CarbonData :: Datamap Examples</name> + <artifactId>carbondata-datamap-examples</artifactId> + <name>Apache CarbonData :: DataMap Examples</name> - <properties> - <dev.path>${basedir}/../../dev</dev.path> - </properties> + <properties> + <dev.path>${basedir}/../../dev</dev.path> + </properties> - <dependencies> - <dependency> - <groupId>org.apache.carbondata</groupId> - <artifactId>carbondata-spark2</artifactId> - <version>${project.version}</version> - <exclusions> - <exclusion> - <groupId>org.apache.spark</groupId> - <artifactId>spark-hive-thriftserver_2.10</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.spark</groupId> - <artifactId>spark-repl_2.10</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.spark</groupId> - <artifactId>spark-sql_2.10</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-sql_${scala.binary.version}</artifactId> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-repl_${scala.binary.version}</artifactId> - </dependency> - </dependencies> + <dependencies> + <dependency> + <groupId>org.apache.carbondata</groupId> + <artifactId>carbondata-spark2</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> - <build> - <sourceDirectory>src/minmaxdatamap/main/java</sourceDirectory> - <resources> - <resource> - <directory>.</directory> - <includes> - <include>CARBON_EXAMPLESLogResource.properties</include> - </includes> - </resource> - </resources> - <plugins> - <plugin> - <groupId>org.scala-tools</groupId> - <artifactId>maven-scala-plugin</artifactId> - <version>2.15.2</version> - <executions> - <execution> - <id>compile</id> - <goals> - <goal>compile</goal> - </goals> - <phase>compile</phase> - </execution> - <execution> - <phase>process-resources</phase> - <goals> - <goal>compile</goal> - </goals> - </execution> - </executions> - </plugin> - <plugin> - <artifactId>maven-compiler-plugin</artifactId> - <configuration> - <source>1.7</source> - <target>1.7</target> - </configuration> - </plugin> - </plugins> - </build> + <build> + <sourceDirectory>src/minmaxdatamap/main/java</sourceDirectory> + <resources> + <resource> + <directory>.</directory> + </resource> + </resources> + <plugins> + <plugin> + <groupId>org.scala-tools</groupId> + <artifactId>maven-scala-plugin</artifactId> + <version>2.15.2</version> + <executions> + <execution> + <id>compile</id> + <goals> + <goal>compile</goal> + </goals> + <phase>compile</phase> + </execution> + <execution> + <phase>process-resources</phase> + <goals> + <goal>compile</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>1.7</source> + <target>1.7</target> + </configuration> + </plugin> + </plugins> + </build> </project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java ---------------------------------------------------------------------- diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java index 5046182..17c8332 100644 --- a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java +++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java @@ -36,7 +36,6 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.schema.table.TableInfo; import org.apache.carbondata.core.util.ByteUtil; import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.core.util.path.CarbonTablePath; import com.google.gson.Gson; http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/examples/flink/pom.xml ---------------------------------------------------------------------- diff --git a/examples/flink/pom.xml b/examples/flink/pom.xml index 75913cf..b783435 100644 --- a/examples/flink/pom.xml +++ b/examples/flink/pom.xml @@ -52,12 +52,12 @@ </dependency> <dependency> <groupId>org.apache.carbondata</groupId> - <artifactId>carbondata-spark</artifactId> + <artifactId>carbondata-spark2</artifactId> <version>${project.version}</version> </dependency> <dependency> <groupId>org.apache.carbondata</groupId> - <artifactId>carbondata-examples-spark</artifactId> + <artifactId>carbondata-examples-spark2</artifactId> <version>${project.version}</version> </dependency> </dependencies> http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/examples/flink/src/main/scala/org/apache/carbondata/examples/FlinkExample.scala ---------------------------------------------------------------------- diff --git a/examples/flink/src/main/scala/org/apache/carbondata/examples/FlinkExample.scala b/examples/flink/src/main/scala/org/apache/carbondata/examples/FlinkExample.scala index 9ce95ae..239a038 100644 --- a/examples/flink/src/main/scala/org/apache/carbondata/examples/FlinkExample.scala +++ b/examples/flink/src/main/scala/org/apache/carbondata/examples/FlinkExample.scala @@ -21,8 +21,8 @@ import org.apache.flink.api.java.ExecutionEnvironment import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job -import org.apache.carbondata.examples.util.ExampleUtils -import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonProjection} +import org.apache.carbondata.hadoop.CarbonProjection +import org.apache.carbondata.hadoop.api.CarbonTableInputFormat // Write carbondata file by spark and read it by flink // scalastyle:off println @@ -30,7 +30,7 @@ object FlinkExample { def main(args: Array[String]): Unit = { // write carbondata file by spark - val cc = ExampleUtils.createCarbonContext("FlinkExample") + val cc = ExampleUtils.createCarbonSession("FlinkExample") val path = ExampleUtils.writeSampleCarbonFile(cc, "carbon1") // read two columns by flink @@ -38,11 +38,11 @@ object FlinkExample { projection.addColumn("c1") // column c1 projection.addColumn("c3") // column c3 val conf = new Configuration() - CarbonInputFormat.setColumnProjection(conf, projection) + CarbonTableInputFormat.setColumnProjection(conf, projection) val env = ExecutionEnvironment.getExecutionEnvironment val ds = env.readHadoopFile( - new CarbonInputFormat[Array[Object]], + new CarbonTableInputFormat[Array[Object]], classOf[Void], classOf[Array[Object]], path, http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStreamSparkStreamingExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStreamSparkStreamingExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStreamSparkStreamingExample.scala index 63b1c5a..856084b 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStreamSparkStreamingExample.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonStreamSparkStreamingExample.scala @@ -20,7 +20,6 @@ package org.apache.carbondata.examples import java.io.{File, PrintWriter} import java.net.ServerSocket -import org.apache.hadoop.conf.Configuration import org.apache.spark.rdd.RDD import org.apache.spark.sql.CarbonEnv import org.apache.spark.sql.CarbonSparkStreamingFactory http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java index 5cebc12..f629d40 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java @@ -33,8 +33,8 @@ import java.util.Map; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datamap.DataMapStoreManager; -import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datamap.DataMapType; +import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datamap.TableDataMap; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.impl.FileFactory; @@ -498,7 +498,8 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); long maxSize = getMaxSplitSize(job); for (Segment segment : streamSegments) { - String segmentDir = CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo()); + String segmentDir = CarbonTablePath.getSegmentPath( + identifier.getTablePath(), segment.getSegmentNo()); FileFactory.FileType fileType = FileFactory.getFileType(segmentDir); if (FileFactory.isFileExist(segmentDir, fileType)) { String indexName = CarbonTablePath.getCarbonStreamIndexFileName(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala index b39c44c..3f0ca42 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala @@ -34,11 +34,11 @@ import org.apache.carbondata.core.datamap.dev.cgdatamap.{AbstractCoarseGrainData import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, DataMap} import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager} import org.apache.carbondata.core.datastore.page.ColumnPage +import org.apache.carbondata.core.exception.ConcurrentOperationException import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.scan.filter.intf.ExpressionType import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.events.Event -import org.apache.carbondata.spark.exception.ConcurrentOperationException import org.apache.carbondata.spark.testsuite.datamap.C2DataMapFactory // This testsuite test insert and insert overwrite with other commands concurrently http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala index b511ee8..6e6be68 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala @@ -24,9 +24,9 @@ import org.apache.spark.sql.test.util.QueryTest import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.common.constants.LoggerAction +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException class StandardPartitionGlobalSortTestCase extends QueryTest with BeforeAndAfterAll { var executorService: ExecutorService = _ http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ProcessMetaDataException.java ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ProcessMetaDataException.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ProcessMetaDataException.java index 3e06bde..471b645 100644 --- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ProcessMetaDataException.java +++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ProcessMetaDataException.java @@ -17,6 +17,8 @@ package org.apache.carbondata.spark.exception; +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException; + // This exception will be thrown when processMetaData failed in // Carbon's RunnableCommand public class ProcessMetaDataException extends MalformedCarbonCommandException { http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala index b69ec37..bfb1616 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala @@ -36,7 +36,6 @@ import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, SegmentFile import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.mutate.CarbonUpdateUtil import org.apache.carbondata.core.statusmanager.SegmentStatusManager -import org.apache.carbondata.spark.util.DataLoadingUtil object CarbonStore { private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) @@ -139,9 +138,8 @@ object CarbonStore { carbonCleanFilesLock = CarbonLockUtil .getLockObject(absoluteTableIdentifier, LockUsage.CLEAN_FILES_LOCK, errorMsg) - SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable, true) - DataLoadingUtil.deleteLoadsAndUpdateMetadata( - isForceDeletion = true, carbonTable, currentTablePartitions.map(_.asJava).orNull) + SegmentStatusManager.deleteLoadsAndUpdateMetadata( + carbonTable, true, currentTablePartitions.map(_.asJava).orNull) CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true) currentTablePartitions match { case Some(partitions) => http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala new file mode 100644 index 0000000..36d8c51 --- /dev/null +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.load + +import java.text.SimpleDateFormat +import java.util.{Date, Locale} + +import scala.collection.mutable.ArrayBuffer + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapred.JobConf +import org.apache.hadoop.mapreduce.{TaskAttemptID, TaskType} +import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit} +import org.apache.hadoop.mapreduce.task.{JobContextImpl, TaskAttemptContextImpl} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow +import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, PartitionedFile} +import org.apache.spark.sql.util.SparkSQLUtil.sessionState + +import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} +import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat +import org.apache.carbondata.processing.loading.model.CarbonLoadModel +import org.apache.carbondata.spark.rdd.SerializableConfiguration +import org.apache.carbondata.spark.util.CommonUtil + +object CsvRDDHelper { + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + /** + * createsw a RDD that does reading of multiple CSV files + */ + def csvFileScanRDD( + spark: SparkSession, + model: CarbonLoadModel, + hadoopConf: Configuration + ): RDD[InternalRow] = { + // 1. partition + val defaultMaxSplitBytes = sessionState(spark).conf.filesMaxPartitionBytes + val openCostInBytes = sessionState(spark).conf.filesOpenCostInBytes + val defaultParallelism = spark.sparkContext.defaultParallelism + CommonUtil.configureCSVInputFormat(hadoopConf, model) + hadoopConf.set(FileInputFormat.INPUT_DIR, model.getFactFilePath) + val jobConf = new JobConf(hadoopConf) + SparkHadoopUtil.get.addCredentials(jobConf) + val jobContext = new JobContextImpl(jobConf, null) + val inputFormat = new CSVInputFormat() + val rawSplits = inputFormat.getSplits(jobContext).toArray + val splitFiles = rawSplits.map { split => + val fileSplit = split.asInstanceOf[FileSplit] + PartitionedFile( + InternalRow.empty, + fileSplit.getPath.toString, + fileSplit.getStart, + fileSplit.getLength, + fileSplit.getLocations) + }.sortBy(_.length)(implicitly[Ordering[Long]].reverse) + val totalBytes = splitFiles.map(_.length + openCostInBytes).sum + val bytesPerCore = totalBytes / defaultParallelism + + val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) + LOGGER.info(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " + + s"open cost is considered as scanning $openCostInBytes bytes.") + + val partitions = new ArrayBuffer[FilePartition] + val currentFiles = new ArrayBuffer[PartitionedFile] + var currentSize = 0L + + def closePartition(): Unit = { + if (currentFiles.nonEmpty) { + val newPartition = + FilePartition( + partitions.size, + currentFiles.toArray.toSeq) + partitions += newPartition + } + currentFiles.clear() + currentSize = 0 + } + + splitFiles.foreach { file => + if (currentSize + file.length > maxSplitBytes) { + closePartition() + } + // Add the given file to the current partition. + currentSize += file.length + openCostInBytes + currentFiles += file + } + closePartition() + + // 2. read function + val serializableConfiguration = new SerializableConfiguration(jobConf) + val readFunction = new (PartitionedFile => Iterator[InternalRow]) with Serializable { + override def apply(file: PartitionedFile): Iterator[InternalRow] = { + new Iterator[InternalRow] { + val hadoopConf = serializableConfiguration.value + val jobTrackerId: String = { + val formatter = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US) + formatter.format(new Date()) + } + val attemptId = new TaskAttemptID(jobTrackerId, 0, TaskType.MAP, 0, 0) + val hadoopAttemptContext = new TaskAttemptContextImpl(hadoopConf, attemptId) + val inputSplit = + new FileSplit(new Path(file.filePath), file.start, file.length, file.locations) + var finished = false + val inputFormat = new CSVInputFormat() + val reader = inputFormat.createRecordReader(inputSplit, hadoopAttemptContext) + reader.initialize(inputSplit, hadoopAttemptContext) + + override def hasNext: Boolean = { + if (!finished) { + if (reader != null) { + if (reader.nextKeyValue()) { + true + } else { + finished = true + reader.close() + false + } + } else { + finished = true + false + } + } else { + false + } + } + + override def next(): InternalRow = { + new GenericInternalRow(reader.getCurrentValue.get().asInstanceOf[Array[Any]]) + } + } + } + } + new FileScanRDD(spark, readFunction, partitions) + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala index e1bd84b..1062cd7 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala @@ -34,7 +34,6 @@ import org.apache.carbondata.processing.loading.{DataLoadProcessBuilder, Failure import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.processing.sort.sortdata.{NewRowComparator, NewRowComparatorForNormalDims, SortParameters} import org.apache.carbondata.processing.util.CarbonDataProcessorUtil -import org.apache.carbondata.spark.util.DataLoadingUtil /** * Use sortBy operator in spark to load the data @@ -52,7 +51,7 @@ object DataLoadProcessBuilderOnSpark { } else { // input data from files val columnCount = model.getCsvHeaderColumns.length - DataLoadingUtil.csvFileScanRDD(sparkSession, model, hadoopConf) + CsvRDDHelper.csvFileScanRDD(sparkSession, model, hadoopConf) .map(DataLoadProcessorStepOnSpark.toStringArrayRow(_, columnCount)) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala index 773ea16..59ed8ba 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.command.{DataTypeInfo, UpdateTableModel} import org.apache.spark.sql.types._ +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.LogService import org.apache.carbondata.core.cache.{Cache, CacheProvider, CacheType} import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier} @@ -46,7 +47,6 @@ import org.apache.carbondata.processing.exception.DataLoadingException import org.apache.carbondata.processing.loading.FailureCauses import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException import org.apache.carbondata.processing.util.CarbonDataProcessorUtil -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException object CarbonScalaUtil { def convertSparkToCarbonDataType(dataType: DataType): CarbonDataType = { http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala index 9104a32..d3093fb 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala @@ -816,8 +816,6 @@ object CommonUtil { val carbonTable = CarbonMetadata.getInstance .getCarbonTable(identifier.getCarbonTableIdentifier.getTableUniqueName) SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable, true) - DataLoadingUtil.deleteLoadsAndUpdateMetadata( - isForceDeletion = true, carbonTable, null) } catch { case _: Exception => LOGGER.warn(s"Error while cleaning table " + http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala index 71ce2c6..3c21af3 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala @@ -31,19 +31,15 @@ import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datamap.Segment import org.apache.carbondata.core.indexstore.PartitionSpec -import org.apache.carbondata.core.metadata.CarbonTableIdentifier -import org.apache.carbondata.core.metadata.SegmentFileStore.SegmentFile -import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier} +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes, DecimalType} import org.apache.carbondata.core.metadata.encoder.Encoding import org.apache.carbondata.core.metadata.schema._ import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, RelationIdentifier, TableInfo, TableSchema} import org.apache.carbondata.core.metadata.schema.table.column.{ColumnSchema, ParentColumnTableRelation} -import org.apache.carbondata.core.service.CarbonCommonFactory import org.apache.carbondata.core.service.impl.ColumnUniqueIdGenerator import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentUpdateStatusManager} import org.apache.carbondata.core.util.DataTypeUtil -import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.processing.loading.FailureCauses import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.processing.merger.CompactionType http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala index e47c500..2f4aa30 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala @@ -21,7 +21,7 @@ import java.io.{File, IOException} import scala.collection.JavaConverters._ -import org.apache.spark.sql.{CarbonEnv, Row, SQLContext, SparkSession} +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, SQLContext} import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.command.{AlterTableModel, AtomicRunnableCommand, CarbonMergerMapping, CompactionModel} @@ -47,7 +47,6 @@ import org.apache.carbondata.processing.loading.events.LoadEvents.LoadMetadataEv import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType} import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory -import org.apache.carbondata.spark.util.CommonUtil import org.apache.carbondata.streaming.StreamHandoffRDD import org.apache.carbondata.streaming.segment.StreamSegment http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala index d2adc57..2092028 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala @@ -26,10 +26,10 @@ import org.apache.spark.sql.optimizer.CarbonFilters import org.apache.carbondata.api.CarbonStore import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.exception.ConcurrentOperationException import org.apache.carbondata.core.indexstore.PartitionSpec import org.apache.carbondata.core.statusmanager.SegmentStatusManager import org.apache.carbondata.events.{CleanFilesPostEvent, CleanFilesPreEvent, OperationContext, OperationListenerBus} -import org.apache.carbondata.spark.exception.ConcurrentOperationException import org.apache.carbondata.spark.util.CommonUtil /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala index 0861c63..81427a1 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala @@ -21,9 +21,9 @@ import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} import org.apache.spark.sql.execution.command.{Checker, DataCommand} import org.apache.carbondata.api.CarbonStore +import org.apache.carbondata.core.exception.ConcurrentOperationException import org.apache.carbondata.core.statusmanager.SegmentStatusManager import org.apache.carbondata.events.{DeleteSegmentByIdPostEvent, DeleteSegmentByIdPreEvent, OperationContext, OperationListenerBus} -import org.apache.carbondata.spark.exception.ConcurrentOperationException case class CarbonDeleteLoadByIdCommand( loadIds: Seq[String], http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala index dcbc6ce..1d76bda 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala @@ -21,9 +21,9 @@ import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} import org.apache.spark.sql.execution.command.{Checker, DataCommand} import org.apache.carbondata.api.CarbonStore +import org.apache.carbondata.core.exception.ConcurrentOperationException import org.apache.carbondata.core.statusmanager.SegmentStatusManager import org.apache.carbondata.events.{DeleteSegmentByDatePostEvent, DeleteSegmentByDatePreEvent, OperationContext, OperationListenerBus} -import org.apache.carbondata.spark.exception.ConcurrentOperationException case class CarbonDeleteLoadByLoadDateCommand( databaseNameOp: Option[String], http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala index 70134a6..eb00ebf 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala @@ -59,10 +59,9 @@ import org.apache.carbondata.core.metadata.encoder.Encoding import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo} import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum} -import org.apache.carbondata.core.statusmanager.SegmentStatus -import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUtil} import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager} import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} +import org.apache.carbondata.core.util.DataTypeUtil import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.events.{OperationContext, OperationListenerBus} import org.apache.carbondata.events.exception.PreEventException @@ -71,18 +70,15 @@ import org.apache.carbondata.processing.exception.DataLoadingException import org.apache.carbondata.processing.loading.TableProcessingOperations import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePreExecutionEvent} import org.apache.carbondata.processing.loading.exception.NoRetryException -import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, CarbonLoadModelBuilder, LoadOption} -import org.apache.carbondata.processing.util.CarbonLoaderUtil +import org.apache.carbondata.processing.loading.model.{CarbonLoadModelBuilder, LoadOption} import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.processing.loading.sort.SortScopeOptions import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil} import org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServiceProvider import org.apache.carbondata.spark.dictionary.server.SecureDictionaryServer -import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, CarbonDropPartitionRDD} -import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, GlobalDictionaryUtil} -import org.apache.carbondata.spark.load.DataLoadProcessorStepOnSpark +import org.apache.carbondata.spark.load.{CsvRDDHelper, DataLoadProcessorStepOnSpark} import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory -import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataLoadingUtil, GlobalDictionaryUtil, SparkDataTypeConverterImpl} +import org.apache.carbondata.spark.util.{CarbonScalaUtil, GlobalDictionaryUtil, SparkDataTypeConverterImpl} case class CarbonLoadDataCommand( databaseNameOp: Option[String], @@ -193,12 +189,18 @@ case class CarbonLoadDataCommand( carbonLoadModel.setAggLoadRequest( internalOptions.getOrElse(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL, "false").toBoolean) carbonLoadModel.setSegmentId(internalOptions.getOrElse("mergedSegmentName", "")) + + val javaPartition = mutable.Map[String, String]() + partition.foreach { case (k, v) => + if (v.isEmpty) javaPartition(k) = null else javaPartition(k) = v.get + } + new CarbonLoadModelBuilder(table).build( options.asJava, optionsFinal, carbonLoadModel, hadoopConf, - partition, + javaPartition.asJava, dataFrame.isDefined) // Delete stale segment folders that are not in table status but are physically present in // the Fact folder @@ -231,11 +233,7 @@ case class CarbonLoadDataCommand( // First system has to partition the data first and then call the load data LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)") // Clean up the old invalid segment data before creating a new entry for new load. - SegmentStatusManager.deleteLoadsAndUpdateMetadata(table, false) - DataLoadingUtil.deleteLoadsAndUpdateMetadata( - isForceDeletion = false, - table, - currPartitions) + SegmentStatusManager.deleteLoadsAndUpdateMetadata(table, false, currPartitions) // add the start entry for the new load in the table status file if (updateModel.isEmpty && !table.isHivePartitionTable) { CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta( @@ -679,7 +677,7 @@ case class CarbonLoadDataCommand( } } val columnCount = carbonLoadModel.getCsvHeaderColumns.length - val rdd = DataLoadingUtil.csvFileScanRDD( + val rdd = CsvRDDHelper.csvFileScanRDD( sparkSession, model = carbonLoadModel, hadoopConf).map(DataLoadProcessorStepOnSpark.toStringArrayRow(_, columnCount)) http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala index f074285..230378b 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala @@ -22,8 +22,8 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources.LogicalRelation -import org.apache.carbondata.common.exceptions.ConcurrentOperationException import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.exception.ConcurrentOperationException import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage} import org.apache.carbondata.core.mutate.CarbonUpdateUtil import org.apache.carbondata.core.statusmanager.SegmentStatusManager http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala index 5165342..2a92478 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala @@ -25,10 +25,10 @@ import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.types.ArrayType import org.apache.spark.storage.StorageLevel -import org.apache.carbondata.common.exceptions.ConcurrentOperationException import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datamap.Segment +import org.apache.carbondata.core.exception.ConcurrentOperationException import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage} import org.apache.carbondata.core.mutate.CarbonUpdateUtil import org.apache.carbondata.core.statusmanager.SegmentStatusManager http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala index cf77e0f..2503fc3 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala @@ -25,12 +25,12 @@ import org.apache.spark.sql.execution.command.{AlterTableRenameModel, MetadataCo import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog} import org.apache.spark.util.AlterTableUtil -import org.apache.carbondata.common.exceptions.ConcurrentOperationException import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datamap.DataMapStoreManager import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.exception.ConcurrentOperationException import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage} import org.apache.carbondata.core.metadata.CarbonTableIdentifier import org.apache.carbondata.core.metadata.schema.table.CarbonTable http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala index 8001a93..0298eea 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala @@ -28,12 +28,12 @@ import org.apache.spark.sql.execution.command.AtomicRunnableCommand import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.exception.ConcurrentOperationException import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage} import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.statusmanager.SegmentStatusManager import org.apache.carbondata.core.util.CarbonUtil import org.apache.carbondata.events._ -import org.apache.carbondata.spark.exception.{ConcurrentOperationException, ProcessMetaDataException} case class CarbonDropTableCommand( ifExistsSet: Boolean, http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala index 61a31a5..2eed988 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala @@ -50,11 +50,8 @@ import org.apache.carbondata.hadoop.api.{CarbonOutputCommitter, CarbonTableOutpu import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat.CarbonRecordWriter import org.apache.carbondata.hadoop.internal.ObjectArrayWritable import org.apache.carbondata.hadoop.util.ObjectSerializationUtil -import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, CarbonLoadModelBuilder, LoadOption} import org.apache.carbondata.spark.util.{CarbonScalaUtil, Util} -import org.apache.carbondata.processing.loading.model.CarbonLoadModel -import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataLoadingUtil, Util} class CarbonFileFormat extends FileFormat http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesInsertTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesInsertTest.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesInsertTest.scala index f8cfa6b..45edd3d 100644 --- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesInsertTest.scala +++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/booleantype/BooleanDataTypesInsertTest.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.test.util.QueryTest import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.util.CarbonProperties class BooleanDataTypesInsertTest extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll { @@ -555,7 +556,7 @@ class BooleanDataTypesInsertTest extends QueryTest with BeforeAndAfterEach with sql( s""" - | LOAD DATA LOCAL INPATH '${storeLocation}' + | LOAD DATA LOCAL INPATH '${FileFactory.getUpdatedFilePath(storeLocation)}' | INTO TABLE hive_table """.stripMargin) @@ -923,7 +924,7 @@ class BooleanDataTypesInsertTest extends QueryTest with BeforeAndAfterEach with sql( s""" - | LOAD DATA LOCAL INPATH '${storeLocation}' + | LOAD DATA LOCAL INPATH '${FileFactory.getUpdatedFilePath(storeLocation)}' | INTO TABLE hive_table """.stripMargin) http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala index 995f041..d94570a 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala @@ -29,6 +29,7 @@ import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.spark.exception.ProcessMetaDataException class AddColumnTestCases extends Spark2QueryTest with BeforeAndAfterAll { http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java index 1104229..66f8bc5 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java +++ b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java @@ -74,7 +74,8 @@ public class DataMapWriterListener { } List<String> columns = factory.getMeta().getIndexedColumns(); List<AbstractDataMapWriter> writers = registry.get(columns); - AbstractDataMapWriter writer = factory.createWriter(new Segment(segmentId, null), dataWritePath); + AbstractDataMapWriter writer = factory.createWriter( + new Segment(segmentId, null), dataWritePath); if (writers != null) { writers.add(writer); } else { http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java index 17e8dbe..29dfa40 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java @@ -19,6 +19,8 @@ package org.apache.carbondata.processing.loading.model; import java.io.IOException; import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -100,6 +102,26 @@ public class CarbonLoadModelBuilder { Map<String, String> optionsFinal, CarbonLoadModel carbonLoadModel, Configuration hadoopConf) throws InvalidLoadOptionException, IOException { + build(options, optionsFinal, carbonLoadModel, hadoopConf, new HashMap<String, String>(), false); + } + + /** + * build CarbonLoadModel for data loading + * @param options Load options from user input + * @param optionsFinal Load options that populated with default values for optional options + * @param carbonLoadModel The output load model + * @param hadoopConf hadoopConf is needed to read CSV header if there 'fileheader' is not set in + * user provided load options + * @param partitions partition name map to path + * @param isDataFrame true if build for load for dataframe + */ + public void build( + Map<String, String> options, + Map<String, String> optionsFinal, + CarbonLoadModel carbonLoadModel, + Configuration hadoopConf, + Map<String, String> partitions, + boolean isDataFrame) throws InvalidLoadOptionException, IOException { carbonLoadModel.setTableName(table.getTableName()); carbonLoadModel.setDatabaseName(table.getDatabaseName()); carbonLoadModel.setTablePath(table.getTablePath()); @@ -214,8 +236,18 @@ public class CarbonLoadModelBuilder { carbonLoadModel.setCsvDelimiter(CarbonUtil.unescapeChar(delimeter)); carbonLoadModel.setCsvHeader(fileHeader); carbonLoadModel.setColDictFilePath(column_dict); + + List<String> ignoreColumns = new ArrayList<>(); + if (!isDataFrame) { + for (Map.Entry<String, String> partition : partitions.entrySet()) { + if (partition.getValue() != null) { + ignoreColumns.add(partition.getKey()); + } + } + } + carbonLoadModel.setCsvHeaderColumns( - LoadOption.getCsvHeaderColumns(carbonLoadModel, hadoopConf)); + LoadOption.getCsvHeaderColumns(carbonLoadModel, hadoopConf, ignoreColumns)); int validatedMaxColumns = validateMaxColumns( carbonLoadModel.getCsvHeaderColumns(), http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java index 5af4859..bac1a94 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java @@ -19,6 +19,8 @@ package org.apache.carbondata.processing.loading.model; import java.io.IOException; import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; import java.util.Map; import org.apache.carbondata.common.Maps; @@ -201,6 +203,16 @@ public class LoadOption { public static String[] getCsvHeaderColumns( CarbonLoadModel carbonLoadModel, Configuration hadoopConf) throws IOException { + return getCsvHeaderColumns(carbonLoadModel, hadoopConf, new LinkedList<String>()); + } + + /** + * Return CSV header field names, with partition column + */ + public static String[] getCsvHeaderColumns( + CarbonLoadModel carbonLoadModel, + Configuration hadoopConf, + List<String> staticPartitionCols) throws IOException { String delimiter; if (StringUtils.isEmpty(carbonLoadModel.getCsvDelimiter())) { delimiter = CarbonCommonConstants.COMMA; @@ -231,7 +243,7 @@ public class LoadOption { } if (!CarbonDataProcessorUtil.isHeaderValid(carbonLoadModel.getTableName(), csvColumns, - carbonLoadModel.getCarbonDataLoadSchema())) { + carbonLoadModel.getCarbonDataLoadSchema(), staticPartitionCols)) { if (csvFile == null) { LOG.error("CSV header in DDL is not proper." + " Column names in schema and CSV header are not the same."); @@ -249,4 +261,5 @@ public class LoadOption { } return csvColumns; } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java index 1ab803b..438e7db 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java @@ -1013,7 +1013,8 @@ public final class CarbonDataMergerUtil { CarbonFile[] updateDeltaFiles = null; Set<String> uniqueBlocks = new HashSet<String>(); - String segmentPath = CarbonTablePath.getSegmentPath(identifier.getTablePath(), seg.getSegmentNo()); + String segmentPath = CarbonTablePath.getSegmentPath( + identifier.getTablePath(), seg.getSegmentNo()); CarbonFile segDir = FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath)); CarbonFile[] allSegmentFiles = segDir.listFiles(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java index efd715c..ba2b0c2 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java @@ -392,7 +392,8 @@ public final class CarbonDataProcessorUtil { * * @return data directory path */ - public static String createCarbonStoreLocation(String databaseName, String tableName, String segmentId) { + public static String createCarbonStoreLocation(String databaseName, String tableName, + String segmentId) { CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName); return CarbonTablePath.getSegmentPath(carbonTable.getTablePath(), segmentId); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/859d71c1/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java index 38e5698..a948538 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java @@ -16,9 +16,13 @@ */ package org.apache.carbondata.processing.util; +import java.io.BufferedWriter; +import java.io.DataOutputStream; import java.io.IOException; +import java.io.OutputStreamWriter; import java.net.InetAddress; import java.net.UnknownHostException; +import java.nio.charset.Charset; import java.util.*; import org.apache.carbondata.common.logging.LogService; @@ -36,6 +40,9 @@ import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.datastore.impl.FileFactory.FileType; +import org.apache.carbondata.core.fileoperations.AtomicFileOperations; +import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl; +import org.apache.carbondata.core.fileoperations.FileWriteOperation; import org.apache.carbondata.core.locks.CarbonLockUtil; import org.apache.carbondata.core.locks.ICarbonLock; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; @@ -55,6 +62,7 @@ import org.apache.carbondata.processing.merger.NodeMultiBlockRelation; import static org.apache.carbondata.core.enums.EscapeSequences.*; +import com.google.gson.Gson; public final class CarbonLoaderUtil {