[GitHub] incubator-carbondata pull request #263: [CARBONDATA-2] Data load integration...

2016-11-10 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/incubator-carbondata/pull/263


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-carbondata pull request #263: [CARBONDATA-2] Data load integration...

2016-11-09 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/263#discussion_r87218055
  
--- Diff: 
examples/src/main/scala/org/apache/carbondata/examples/CarbonExample1.scala ---
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.examples.util.ExampleUtils
+
+object CarbonExample1 {
--- End diff --

This is wrongly added


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-carbondata pull request #263: [CARBONDATA-2] Data load integration...

2016-11-09 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/263#discussion_r87217857
  
--- Diff: conf/ss.txt ---
@@ -0,0 +1,122 @@
+
+Release Notes - CarbonData - Version 0.1.0-incubating
--- End diff --

why this is added?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-carbondata pull request #263: [CARBONDATA-2] Data load integration...

2016-11-09 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/263#discussion_r87172561
  
--- Diff: 
integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
 ---
@@ -215,6 +227,105 @@ public static void executeGraph(CarbonLoadModel 
loadModel, String storeLocation,
 info, loadModel.getPartitionId(), 
loadModel.getCarbonDataLoadSchema());
   }
 
+  public static void executeNewDataLoad(CarbonLoadModel loadModel, String 
storeLocation,
+  String hdfsStoreLocation, RecordReader[] recordReaders)
+  throws Exception {
+if (!new File(storeLocation).mkdirs()) {
+  LOGGER.error("Error while creating the temp store path: " + 
storeLocation);
+}
+CarbonDataLoadConfiguration configuration = new 
CarbonDataLoadConfiguration();
+String databaseName = loadModel.getDatabaseName();
+String tableName = loadModel.getTableName();
+String tempLocationKey = databaseName + 
CarbonCommonConstants.UNDERSCORE + tableName
++ CarbonCommonConstants.UNDERSCORE + loadModel.getTaskNo();
+CarbonProperties.getInstance().addProperty(tempLocationKey, 
storeLocation);
+CarbonProperties.getInstance()
+.addProperty(CarbonCommonConstants.STORE_LOCATION_HDFS, 
hdfsStoreLocation);
+// CarbonProperties.getInstance().addProperty("store_output_location", 
outPutLoc);
+CarbonProperties.getInstance().addProperty("send.signal.load", 
"false");
+
+CarbonTable carbonTable = 
loadModel.getCarbonDataLoadSchema().getCarbonTable();
+AbsoluteTableIdentifier identifier =
+carbonTable.getAbsoluteTableIdentifier();
+configuration.setTableIdentifier(identifier);
+String csvHeader = loadModel.getCsvHeader();
+String csvFileName = null;
+if (csvHeader != null && !csvHeader.isEmpty()) {
+  
configuration.setHeader(CarbonDataProcessorUtil.getColumnFields(csvHeader, 
","));
+} else {
+  CarbonFile csvFile =
+  
CarbonDataProcessorUtil.getCsvFileToRead(loadModel.getFactFilesToProcess().get(0));
+  csvFileName = csvFile.getName();
+  csvHeader = CarbonDataProcessorUtil.getFileHeader(csvFile);
+  configuration.setHeader(
+  CarbonDataProcessorUtil.getColumnFields(csvHeader, 
loadModel.getCsvDelimiter()));
+}
+CarbonDataProcessorUtil
+.validateHeader(loadModel.getTableName(), csvHeader, 
loadModel.getCarbonDataLoadSchema(),
+loadModel.getCsvDelimiter(), csvFileName);
+
+configuration.setPartitionId(loadModel.getPartitionId());
+configuration.setSegmentId(loadModel.getSegmentId());
+configuration.setTaskNo(loadModel.getTaskNo());
+
configuration.setDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS,
+new String[] { loadModel.getComplexDelimiterLevel1(),
+loadModel.getComplexDelimiterLevel2() });
+
configuration.setDataLoadProperty(DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT,
+loadModel.getSerializationNullFormat().split(",")[1]);
+
configuration.setDataLoadProperty(DataLoadProcessorConstants.FACT_TIME_STAMP,
+loadModel.getFactTimeStamp());
+
configuration.setDataLoadProperty(DataLoadProcessorConstants.BAD_RECORDS_LOGGER_ENABLE,
+loadModel.getBadRecordsLoggerEnable().split(",")[1]);
+
configuration.setDataLoadProperty(DataLoadProcessorConstants.BAD_RECORDS_LOGGER_ACTION,
+loadModel.getBadRecordsAction().split(",")[1]);
+
configuration.setDataLoadProperty(DataLoadProcessorConstants.FACT_FILE_PATH,
+loadModel.getFactFilePath());
+List dimensions =
+
carbonTable.getDimensionByTableName(carbonTable.getFactTableName());
+List measures =
+carbonTable.getMeasureByTableName(carbonTable.getFactTableName());
+Map dateFormatMap =
+
CarbonDataProcessorUtil.getDateFormatMap(loadModel.getDateFormat());
+List dataFields = new ArrayList<>();
+List complexDataFields = new ArrayList<>();
+
+// First add dictionary and non dictionary dimensions because these 
are part of mdk key.
+// And then add complex data types and measures.
+for (CarbonColumn column : dimensions) {
+  DataField dataField = new DataField(column);
+  dataField.setDateFormat(dateFormatMap.get(column.getColName()));
+  if (column.isComplex()) {
+complexDataFields.add(dataField);
+  } else {
+dataFields.add(dataField);
+  }
+}
+dataFields.addAll(complexDataFields);
+for (CarbonColumn column : measures) {
+  // This dummy measure is added when no measure was present. We no 
need to load it.
+  if (!(column.getColNam

[GitHub] incubator-carbondata pull request #263: [CARBONDATA-2] Data load integration...

2016-11-09 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/263#discussion_r87171862
  
--- Diff: 
processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
 ---
@@ -43,35 +45,58 @@
 
   private CarbonDataLoadConfiguration configuration;
 
+  private DataField[] fields;
+
   private FieldConverter[] fieldConverters;
 
-  public RowConverterImpl(DataField[] fields, CarbonDataLoadConfiguration 
configuration) {
+  private BadRecordslogger badRecordLogger;
+
+  private BadRecordLogHolder logHolder;
+
+  public RowConverterImpl(DataField[] fields, CarbonDataLoadConfiguration 
configuration,
+  BadRecordslogger badRecordLogger) {
+this.fields = fields;
 this.configuration = configuration;
+this.badRecordLogger = badRecordLogger;
+  }
+
+  @Override
+  public void initialize() {
 CacheProvider cacheProvider = CacheProvider.getInstance();
 Cache cache =
 cacheProvider.createCache(CacheType.REVERSE_DICTIONARY,
 configuration.getTableIdentifier().getStorePath());
+String nullFormat =
+
configuration.getDataLoadProperty(DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT)
+.toString();
 List fieldConverterList = new ArrayList<>();
 
 long lruCacheStartTime = System.currentTimeMillis();
 
 for (int i = 0; i < fields.length; i++) {
   FieldConverter fieldConverter = FieldEncoderFactory.getInstance()
   .createFieldEncoder(fields[i], cache,
-  
configuration.getTableIdentifier().getCarbonTableIdentifier(), i);
-  if (fieldConverter != null) {
-fieldConverterList.add(fieldConverter);
-  }
+  
configuration.getTableIdentifier().getCarbonTableIdentifier(), i, nullFormat);
+  fieldConverterList.add(fieldConverter);
 }
 CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
 .recordLruCacheLoadTime((System.currentTimeMillis() - 
lruCacheStartTime) / 1000.0);
 fieldConverters = fieldConverterList.toArray(new 
FieldConverter[fieldConverterList.size()]);
+logHolder = new BadRecordLogHolder();
   }
 
   @Override
   public CarbonRow convert(CarbonRow row) throws 
CarbonDataLoadingException {
+CarbonRow copy = row.getCopy();
--- End diff --

It is required as the fieldConverters update the row and if badrecord found 
in last column then we don't want to give converted data logger.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-carbondata pull request #263: [CARBONDATA-2] Data load integration...

2016-11-09 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/263#discussion_r87171880
  
--- Diff: 
processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
 ---
@@ -102,21 +102,14 @@ public void initialize(SortParameters sortParameters) 
{
 }
 this.executorService = Executors.newFixedThreadPool(iterators.length);
 
-// First prepare the data for sort.
-Iterator[] sortPrepIterators = new 
Iterator[iterators.length];
-for (int i = 0; i < sortPrepIterators.length; i++) {
-  sortPrepIterators[i] = new SortPreparatorIterator(iterators[i], 
inputDataFields);
-}
-
-for (int i = 0; i < sortDataRows.length; i++) {
-  executorService
-  .submit(new SortIteratorThread(sortPrepIterators[i], 
sortDataRows[i], sortParameters));
-}
-
 try {
+  for (int i = 0; i < sortDataRows.length; i++) {
+executorService
+.submit(new SortIteratorThread(iterators[i], sortDataRows[i], 
sortParameters));
--- End diff --

ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-carbondata pull request #263: [CARBONDATA-2] Data load integration...

2016-11-09 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/263#discussion_r87171144
  
--- Diff: 
processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
 ---
@@ -110,7 +116,11 @@ public IntermediateFileMerger(SortParameters 
mergerParameters, File[] intermedia
   initialize();
 
   while (hasNext()) {
-writeDataTofile(next());
+if (useKettle) {
--- End diff --

Ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-carbondata pull request #263: [CARBONDATA-2] Data load integration...

2016-11-09 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/263#discussion_r87171167
  
--- Diff: 
processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java
 ---
@@ -50,6 +50,7 @@ public SortProcessorStepImpl(CarbonDataLoadConfiguration 
configuration,
 
   @Override
   public void initialize() throws CarbonDataLoadingException {
+super.initialize();
--- End diff --

Ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-carbondata pull request #263: [CARBONDATA-2] Data load integration...

2016-11-09 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/263#discussion_r87171187
  
--- Diff: 
processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
 ---
@@ -47,20 +58,109 @@ public 
DataConverterProcessorStepImpl(CarbonDataLoadConfiguration configuration,
 
   @Override
   public void initialize() throws CarbonDataLoadingException {
-encoder = new RowConverterImpl(child.getOutput(), configuration);
-child.initialize();
+super.initialize();
+BadRecordslogger badRecordLogger = createBadRecordLogger();
+converter = new RowConverterImpl(child.getOutput(), configuration, 
badRecordLogger);
+converter.initialize();
+  }
+
+  /**
+   * Create the iterator using child iterator.
+   *
+   * @param childIter
+   * @return new iterator with step specific processing.
+   */
+  @Override
+  protected Iterator getIterator(final 
Iterator childIter) {
+return new CarbonIterator() {
+  RowConverter localConverter = converter.createCopyForNewThread();
+  @Override public boolean hasNext() {
+return childIter.hasNext();
+  }
+
+  @Override public CarbonRowBatch next() {
+return processRowBatch(childIter.next(), localConverter);
+  }
+};
+  }
+
+  /**
+   * Process the batch of rows as per the step logic.
+   *
+   * @param rowBatch
+   * @return processed row.
+   */
+  protected CarbonRowBatch processRowBatch(CarbonRowBatch rowBatch, 
RowConverter localConverter) {
+CarbonRowBatch newBatch = new CarbonRowBatch();
+Iterator batchIterator = rowBatch.getBatchIterator();
+while (batchIterator.hasNext()) {
+  newBatch.addRow(localConverter.convert(batchIterator.next()));
+}
+return newBatch;
   }
 
   @Override
   protected CarbonRow processRow(CarbonRow row) {
-return encoder.convert(row);
+// Not implemented
+return null;
+  }
+
+  private BadRecordslogger createBadRecordLogger() {
+boolean badRecordsLogRedirect = false;
+boolean badRecordConvertNullDisable = false;
+boolean badRecordsLoggerEnable = Boolean.parseBoolean(
+
configuration.getDataLoadProperty(DataLoadProcessorConstants.BAD_RECORDS_LOGGER_ENABLE)
+.toString());
+Object bad_records_action =
+
configuration.getDataLoadProperty(DataLoadProcessorConstants.BAD_RECORDS_LOGGER_ACTION)
+.toString();
+if (null != bad_records_action) {
+  LoggerAction loggerAction = null;
+  try {
+loggerAction = 
LoggerAction.valueOf(bad_records_action.toString().toUpperCase());
+  } catch (IllegalArgumentException e) {
+loggerAction = LoggerAction.FORCE;
+  }
+  switch (loggerAction) {
+case FORCE:
+  badRecordConvertNullDisable = false;
+  break;
+case REDIRECT:
+  badRecordsLogRedirect = true;
+  badRecordConvertNullDisable = true;
+  break;
+case IGNORE:
+  badRecordsLogRedirect = false;
+  badRecordConvertNullDisable = true;
+  break;
+  }
+}
+CarbonTableIdentifier identifier =
+configuration.getTableIdentifier().getCarbonTableIdentifier();
+String key = identifier.getDatabaseName() + '/' + 
identifier.getTableName() + '_' + identifier
+.getTableName();
+BadRecordslogger badRecordslogger =
+new BadRecordslogger(key, identifier.getTableName() + '_' + 
System.currentTimeMillis(),
+getBadLogStoreLocation(
+identifier.getDatabaseName() + '/' + 
identifier.getTableName() + "/" + configuration
--- End diff --

ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-carbondata pull request #263: [CARBONDATA-2] Data load integration...

2016-11-09 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/263#discussion_r87170923
  
--- Diff: 
processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
 ---
@@ -122,6 +116,11 @@
 
   private int numberOfCores;
 
+  /**
+   * Temporary conf , it will be removed after refactor.
--- End diff --

ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-carbondata pull request #263: [CARBONDATA-2] Data load integration...

2016-11-09 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/263#discussion_r87170873
  
--- Diff: 
processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/BadRecordslogger.java
 ---
@@ -81,13 +81,24 @@
*/
   private String taskKey;
 
+  private boolean badRecordsLogRedirect;
+
+  private boolean badRecordLoggerEnable;
+
+  private boolean badRecordConvertNullDisable;
+
   // private final Object syncObject =new Object();
 
-  public BadRecordslogger(String key, String fileName, String storePath) {
+  public BadRecordslogger(String key, String fileName, String storePath,
--- End diff --

ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-carbondata pull request #263: [CARBONDATA-2] Data load integration...

2016-11-09 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/263#discussion_r87170942
  
--- Diff: 
processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
 ---
@@ -264,6 +277,72 @@ private void writeData(Object[][] recordHolderList, 
int entryCountLocal, File fi
 }
   }
 
+  private void writeDataWithOutKettle(Object[][] recordHolderList, int 
entryCountLocal, File file)
+  throws CarbonSortKeyAndGroupByException {
+DataOutputStream stream = null;
+try {
+  // open stream
+  stream = new DataOutputStream(new BufferedOutputStream(new 
FileOutputStream(file),
+  parameters.getFileWriteBufferSize()));
+
+  // write number of entries to the file
+  stream.writeInt(entryCountLocal);
+  int complexDimColCount = parameters.getComplexDimColCount();
+  int dimColCount = parameters.getDimColCount() + complexDimColCount;
+  char[] aggType = parameters.getAggType();
+  boolean[] noDictionaryDimnesionMapping = 
parameters.getNoDictionaryDimnesionColumn();
+  Object[] row = null;
+  for (int i = 0; i < entryCountLocal; i++) {
+// get row from record holder list
+row = recordHolderList[i];
+int dimCount = 0;
--- End diff --

ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-carbondata pull request #263: [CARBONDATA-2] Data load integration...

2016-11-09 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/263#discussion_r87170892
  
--- Diff: 
processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
 ---
@@ -136,6 +136,9 @@
*/
   private boolean[] isNoDictionaryDimensionColumn;
 
+  // temporary configuration
+  private boolean useKettle;
--- End diff --

ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-carbondata pull request #263: [CARBONDATA-2] Data load integration...

2016-11-09 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/263#discussion_r87170828
  
--- Diff: 
processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
 ---
@@ -396,4 +407,223 @@ private static void 
addAllComplexTypeChildren(CarbonDimension dimension, StringB
 }
 return complexTypesMap;
   }
+
+  /**
+   * Get the csv file to read if it the path is file otherwise get the 
first file of directory.
+   *
+   * @param csvFilePath
+   * @return File
+   */
+  public static CarbonFile getCsvFileToRead(String csvFilePath) {
+CarbonFile csvFile =
+FileFactory.getCarbonFile(csvFilePath, 
FileFactory.getFileType(csvFilePath));
+
+CarbonFile[] listFiles = null;
+if (csvFile.isDirectory()) {
+  listFiles = csvFile.listFiles(new CarbonFileFilter() {
+@Override public boolean accept(CarbonFile pathname) {
+  if (!pathname.isDirectory()) {
+if 
(pathname.getName().endsWith(CarbonCommonConstants.CSV_FILE_EXTENSION) || 
pathname
+
.getName().endsWith(CarbonCommonConstants.CSV_FILE_EXTENSION
++ CarbonCommonConstants.FILE_INPROGRESS_STATUS)) {
+  return true;
+}
+  }
+
+  return false;
+}
+  });
+} else {
+  listFiles = new CarbonFile[1];
+  listFiles[0] = csvFile;
+
--- End diff --

ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-carbondata pull request #263: [CARBONDATA-2] Data load integration...

2016-11-09 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/263#discussion_r87170805
  
--- Diff: 
processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
 ---
@@ -396,4 +407,223 @@ private static void 
addAllComplexTypeChildren(CarbonDimension dimension, StringB
 }
 return complexTypesMap;
   }
+
+  /**
+   * Get the csv file to read if it the path is file otherwise get the 
first file of directory.
+   *
+   * @param csvFilePath
+   * @return File
+   */
+  public static CarbonFile getCsvFileToRead(String csvFilePath) {
+CarbonFile csvFile =
+FileFactory.getCarbonFile(csvFilePath, 
FileFactory.getFileType(csvFilePath));
+
+CarbonFile[] listFiles = null;
+if (csvFile.isDirectory()) {
+  listFiles = csvFile.listFiles(new CarbonFileFilter() {
+@Override public boolean accept(CarbonFile pathname) {
+  if (!pathname.isDirectory()) {
+if 
(pathname.getName().endsWith(CarbonCommonConstants.CSV_FILE_EXTENSION) || 
pathname
+
.getName().endsWith(CarbonCommonConstants.CSV_FILE_EXTENSION
++ CarbonCommonConstants.FILE_INPROGRESS_STATUS)) {
+  return true;
+}
+  }
+
--- End diff --

ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-carbondata pull request #263: [CARBONDATA-2] Data load integration...

2016-11-09 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/263#discussion_r87170789
  
--- Diff: 
integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
 ---
@@ -215,6 +227,105 @@ public static void executeGraph(CarbonLoadModel 
loadModel, String storeLocation,
 info, loadModel.getPartitionId(), 
loadModel.getCarbonDataLoadSchema());
   }
 
+  public static void executeNewDataLoad(CarbonLoadModel loadModel, String 
storeLocation,
+  String hdfsStoreLocation, RecordReader[] recordReaders)
+  throws Exception {
+if (!new File(storeLocation).mkdirs()) {
+  LOGGER.error("Error while creating the temp store path: " + 
storeLocation);
+}
+CarbonDataLoadConfiguration configuration = new 
CarbonDataLoadConfiguration();
+String databaseName = loadModel.getDatabaseName();
+String tableName = loadModel.getTableName();
+String tempLocationKey = databaseName + 
CarbonCommonConstants.UNDERSCORE + tableName
++ CarbonCommonConstants.UNDERSCORE + loadModel.getTaskNo();
+CarbonProperties.getInstance().addProperty(tempLocationKey, 
storeLocation);
+CarbonProperties.getInstance()
+.addProperty(CarbonCommonConstants.STORE_LOCATION_HDFS, 
hdfsStoreLocation);
+// CarbonProperties.getInstance().addProperty("store_output_location", 
outPutLoc);
+CarbonProperties.getInstance().addProperty("send.signal.load", 
"false");
+
+CarbonTable carbonTable = 
loadModel.getCarbonDataLoadSchema().getCarbonTable();
+AbsoluteTableIdentifier identifier =
+carbonTable.getAbsoluteTableIdentifier();
+configuration.setTableIdentifier(identifier);
+String csvHeader = loadModel.getCsvHeader();
+String csvFileName = null;
+if (csvHeader != null && !csvHeader.isEmpty()) {
+  
configuration.setHeader(CarbonDataProcessorUtil.getColumnFields(csvHeader, 
","));
+} else {
+  CarbonFile csvFile =
+  
CarbonDataProcessorUtil.getCsvFileToRead(loadModel.getFactFilesToProcess().get(0));
+  csvFileName = csvFile.getName();
+  csvHeader = CarbonDataProcessorUtil.getFileHeader(csvFile);
+  configuration.setHeader(
+  CarbonDataProcessorUtil.getColumnFields(csvHeader, 
loadModel.getCsvDelimiter()));
+}
+CarbonDataProcessorUtil
+.validateHeader(loadModel.getTableName(), csvHeader, 
loadModel.getCarbonDataLoadSchema(),
+loadModel.getCsvDelimiter(), csvFileName);
--- End diff --

Ok. Modified.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-carbondata pull request #263: [CARBONDATA-2] Data load integration...

2016-11-09 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/263#discussion_r87170848
  
--- Diff: 
processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
 ---
@@ -952,10 +951,9 @@ private String getCarbonLocalBaseStoreLocation() {
 // In that case it will have first value empty and other values will 
be null
 // So If records is coming like this then we need to write this 
records as a bad Record.
 
-if (null == r[0] && badRecordConvertNullDisable) {
+if (null == r[0] && badRecordslogger.isBadRecordConvertNullDisable()) {
   badRecordslogger
-  .addBadRecordsToBuilder(r, "Column Names are coming NULL", 
"null",
-  badRecordsLogRedirect, badRecordsLoggerEnable);
+  .addBadRecordsToBuilder(r, "Column Names are coming NULL", 
"null");
--- End diff --

ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-carbondata pull request #263: [CARBONDATA-2] Data load integration...

2016-11-09 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/263#discussion_r87170688
  
--- Diff: 
integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
 ---
@@ -215,6 +227,105 @@ public static void executeGraph(CarbonLoadModel 
loadModel, String storeLocation,
 info, loadModel.getPartitionId(), 
loadModel.getCarbonDataLoadSchema());
   }
 
+  public static void executeNewDataLoad(CarbonLoadModel loadModel, String 
storeLocation,
+  String hdfsStoreLocation, RecordReader[] recordReaders)
+  throws Exception {
+if (!new File(storeLocation).mkdirs()) {
+  LOGGER.error("Error while creating the temp store path: " + 
storeLocation);
+}
+CarbonDataLoadConfiguration configuration = new 
CarbonDataLoadConfiguration();
+String databaseName = loadModel.getDatabaseName();
+String tableName = loadModel.getTableName();
+String tempLocationKey = databaseName + 
CarbonCommonConstants.UNDERSCORE + tableName
++ CarbonCommonConstants.UNDERSCORE + loadModel.getTaskNo();
+CarbonProperties.getInstance().addProperty(tempLocationKey, 
storeLocation);
+CarbonProperties.getInstance()
+.addProperty(CarbonCommonConstants.STORE_LOCATION_HDFS, 
hdfsStoreLocation);
+// CarbonProperties.getInstance().addProperty("store_output_location", 
outPutLoc);
+CarbonProperties.getInstance().addProperty("send.signal.load", 
"false");
+
+CarbonTable carbonTable = 
loadModel.getCarbonDataLoadSchema().getCarbonTable();
+AbsoluteTableIdentifier identifier =
+carbonTable.getAbsoluteTableIdentifier();
+configuration.setTableIdentifier(identifier);
+String csvHeader = loadModel.getCsvHeader();
+String csvFileName = null;
+if (csvHeader != null && !csvHeader.isEmpty()) {
+  
configuration.setHeader(CarbonDataProcessorUtil.getColumnFields(csvHeader, 
","));
+} else {
+  CarbonFile csvFile =
+  
CarbonDataProcessorUtil.getCsvFileToRead(loadModel.getFactFilesToProcess().get(0));
--- End diff --

csvfile can exists inside hdfs also, thats why we are creating CarbonFile


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-carbondata pull request #263: [CARBONDATA-2] Data load integration...

2016-11-08 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/263#discussion_r87125387
  
--- Diff: 
integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
 ---
@@ -215,6 +227,105 @@ public static void executeGraph(CarbonLoadModel 
loadModel, String storeLocation,
 info, loadModel.getPartitionId(), 
loadModel.getCarbonDataLoadSchema());
   }
 
+  public static void executeNewDataLoad(CarbonLoadModel loadModel, String 
storeLocation,
+  String hdfsStoreLocation, RecordReader[] recordReaders)
+  throws Exception {
+if (!new File(storeLocation).mkdirs()) {
+  LOGGER.error("Error while creating the temp store path: " + 
storeLocation);
+}
+CarbonDataLoadConfiguration configuration = new 
CarbonDataLoadConfiguration();
+String databaseName = loadModel.getDatabaseName();
+String tableName = loadModel.getTableName();
+String tempLocationKey = databaseName + 
CarbonCommonConstants.UNDERSCORE + tableName
++ CarbonCommonConstants.UNDERSCORE + loadModel.getTaskNo();
+CarbonProperties.getInstance().addProperty(tempLocationKey, 
storeLocation);
+CarbonProperties.getInstance()
+.addProperty(CarbonCommonConstants.STORE_LOCATION_HDFS, 
hdfsStoreLocation);
+// CarbonProperties.getInstance().addProperty("store_output_location", 
outPutLoc);
+CarbonProperties.getInstance().addProperty("send.signal.load", 
"false");
+
+CarbonTable carbonTable = 
loadModel.getCarbonDataLoadSchema().getCarbonTable();
+AbsoluteTableIdentifier identifier =
+carbonTable.getAbsoluteTableIdentifier();
+configuration.setTableIdentifier(identifier);
+String csvHeader = loadModel.getCsvHeader();
+String csvFileName = null;
+if (csvHeader != null && !csvHeader.isEmpty()) {
+  
configuration.setHeader(CarbonDataProcessorUtil.getColumnFields(csvHeader, 
","));
+} else {
+  CarbonFile csvFile =
+  
CarbonDataProcessorUtil.getCsvFileToRead(loadModel.getFactFilesToProcess().get(0));
--- End diff --

CSV file is not a `CarbonFile`, here it just need to pass the file path 
string to validateHeader, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-carbondata pull request #263: [CARBONDATA-2] Data load integration...

2016-11-08 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/263#discussion_r87124965
  
--- Diff: 
processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
 ---
@@ -396,4 +407,223 @@ private static void 
addAllComplexTypeChildren(CarbonDimension dimension, StringB
 }
 return complexTypesMap;
   }
+
+  /**
+   * Get the csv file to read if it the path is file otherwise get the 
first file of directory.
+   *
+   * @param csvFilePath
+   * @return File
+   */
+  public static CarbonFile getCsvFileToRead(String csvFilePath) {
+CarbonFile csvFile =
+FileFactory.getCarbonFile(csvFilePath, 
FileFactory.getFileType(csvFilePath));
+
+CarbonFile[] listFiles = null;
+if (csvFile.isDirectory()) {
+  listFiles = csvFile.listFiles(new CarbonFileFilter() {
+@Override public boolean accept(CarbonFile pathname) {
+  if (!pathname.isDirectory()) {
+if 
(pathname.getName().endsWith(CarbonCommonConstants.CSV_FILE_EXTENSION) || 
pathname
+
.getName().endsWith(CarbonCommonConstants.CSV_FILE_EXTENSION
++ CarbonCommonConstants.FILE_INPROGRESS_STATUS)) {
+  return true;
+}
+  }
+
--- End diff --

remove empty line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-carbondata pull request #263: [CARBONDATA-2] Data load integration...

2016-11-08 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/263#discussion_r87124860
  
--- Diff: 
processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
 ---
@@ -952,10 +951,9 @@ private String getCarbonLocalBaseStoreLocation() {
 // In that case it will have first value empty and other values will 
be null
 // So If records is coming like this then we need to write this 
records as a bad Record.
 
-if (null == r[0] && badRecordConvertNullDisable) {
+if (null == r[0] && badRecordslogger.isBadRecordConvertNullDisable()) {
   badRecordslogger
-  .addBadRecordsToBuilder(r, "Column Names are coming NULL", 
"null",
-  badRecordsLogRedirect, badRecordsLoggerEnable);
+  .addBadRecordsToBuilder(r, "Column Names are coming NULL", 
"null");
--- End diff --

all caller of `addBadRecordsToBuilder`, the laster parameter always passing 
`"null"`, it can be removed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-carbondata pull request #263: [CARBONDATA-2] Data load integration...

2016-11-08 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/263#discussion_r87124691
  
--- Diff: 
processing/src/main/java/org/apache/carbondata/processing/surrogatekeysgenerator/csvbased/BadRecordslogger.java
 ---
@@ -81,13 +81,24 @@
*/
   private String taskKey;
 
+  private boolean badRecordsLogRedirect;
+
+  private boolean badRecordLoggerEnable;
+
+  private boolean badRecordConvertNullDisable;
+
   // private final Object syncObject =new Object();
 
-  public BadRecordslogger(String key, String fileName, String storePath) {
+  public BadRecordslogger(String key, String fileName, String storePath,
--- End diff --

This class name is not correct, please rename to `BadRecordsLogger`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-carbondata pull request #263: [CARBONDATA-2] Data load integration...

2016-11-08 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/263#discussion_r87124544
  
--- Diff: 
processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java
 ---
@@ -122,6 +116,11 @@
 
   private int numberOfCores;
 
+  /**
+   * Temporary conf , it will be removed after refactor.
--- End diff --

add a TODO tag


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-carbondata pull request #263: [CARBONDATA-2] Data load integration...

2016-11-08 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/263#discussion_r87123603
  
--- Diff: 
processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
 ---
@@ -47,20 +58,109 @@ public 
DataConverterProcessorStepImpl(CarbonDataLoadConfiguration configuration,
 
   @Override
   public void initialize() throws CarbonDataLoadingException {
-encoder = new RowConverterImpl(child.getOutput(), configuration);
-child.initialize();
+super.initialize();
+BadRecordslogger badRecordLogger = createBadRecordLogger();
+converter = new RowConverterImpl(child.getOutput(), configuration, 
badRecordLogger);
+converter.initialize();
+  }
+
+  /**
+   * Create the iterator using child iterator.
+   *
+   * @param childIter
+   * @return new iterator with step specific processing.
+   */
+  @Override
+  protected Iterator getIterator(final 
Iterator childIter) {
+return new CarbonIterator() {
+  RowConverter localConverter = converter.createCopyForNewThread();
+  @Override public boolean hasNext() {
+return childIter.hasNext();
+  }
+
+  @Override public CarbonRowBatch next() {
+return processRowBatch(childIter.next(), localConverter);
+  }
+};
+  }
+
+  /**
+   * Process the batch of rows as per the step logic.
+   *
+   * @param rowBatch
+   * @return processed row.
+   */
+  protected CarbonRowBatch processRowBatch(CarbonRowBatch rowBatch, 
RowConverter localConverter) {
+CarbonRowBatch newBatch = new CarbonRowBatch();
+Iterator batchIterator = rowBatch.getBatchIterator();
+while (batchIterator.hasNext()) {
+  newBatch.addRow(localConverter.convert(batchIterator.next()));
+}
+return newBatch;
   }
 
   @Override
   protected CarbonRow processRow(CarbonRow row) {
-return encoder.convert(row);
+// Not implemented
+return null;
+  }
+
+  private BadRecordslogger createBadRecordLogger() {
+boolean badRecordsLogRedirect = false;
+boolean badRecordConvertNullDisable = false;
+boolean badRecordsLoggerEnable = Boolean.parseBoolean(
+
configuration.getDataLoadProperty(DataLoadProcessorConstants.BAD_RECORDS_LOGGER_ENABLE)
+.toString());
+Object bad_records_action =
+
configuration.getDataLoadProperty(DataLoadProcessorConstants.BAD_RECORDS_LOGGER_ACTION)
+.toString();
+if (null != bad_records_action) {
+  LoggerAction loggerAction = null;
+  try {
+loggerAction = 
LoggerAction.valueOf(bad_records_action.toString().toUpperCase());
+  } catch (IllegalArgumentException e) {
+loggerAction = LoggerAction.FORCE;
+  }
+  switch (loggerAction) {
+case FORCE:
+  badRecordConvertNullDisable = false;
+  break;
+case REDIRECT:
+  badRecordsLogRedirect = true;
+  badRecordConvertNullDisable = true;
+  break;
+case IGNORE:
+  badRecordsLogRedirect = false;
+  badRecordConvertNullDisable = true;
+  break;
+  }
+}
+CarbonTableIdentifier identifier =
+configuration.getTableIdentifier().getCarbonTableIdentifier();
+String key = identifier.getDatabaseName() + '/' + 
identifier.getTableName() + '_' + identifier
--- End diff --

add this string concatenation into `CarbonTableIdentifier` as an utility 
function


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-carbondata pull request #263: [CARBONDATA-2] Data load integration...

2016-11-08 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/263#discussion_r87123336
  
--- Diff: 
processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
 ---
@@ -47,20 +58,109 @@ public 
DataConverterProcessorStepImpl(CarbonDataLoadConfiguration configuration,
 
   @Override
   public void initialize() throws CarbonDataLoadingException {
-encoder = new RowConverterImpl(child.getOutput(), configuration);
-child.initialize();
+super.initialize();
+BadRecordslogger badRecordLogger = createBadRecordLogger();
+converter = new RowConverterImpl(child.getOutput(), configuration, 
badRecordLogger);
+converter.initialize();
+  }
+
+  /**
+   * Create the iterator using child iterator.
+   *
+   * @param childIter
+   * @return new iterator with step specific processing.
+   */
+  @Override
+  protected Iterator getIterator(final 
Iterator childIter) {
+return new CarbonIterator() {
+  RowConverter localConverter = converter.createCopyForNewThread();
+  @Override public boolean hasNext() {
+return childIter.hasNext();
+  }
+
+  @Override public CarbonRowBatch next() {
+return processRowBatch(childIter.next(), localConverter);
+  }
+};
+  }
+
+  /**
+   * Process the batch of rows as per the step logic.
+   *
+   * @param rowBatch
+   * @return processed row.
+   */
+  protected CarbonRowBatch processRowBatch(CarbonRowBatch rowBatch, 
RowConverter localConverter) {
+CarbonRowBatch newBatch = new CarbonRowBatch();
+Iterator batchIterator = rowBatch.getBatchIterator();
+while (batchIterator.hasNext()) {
+  newBatch.addRow(localConverter.convert(batchIterator.next()));
+}
+return newBatch;
   }
 
   @Override
   protected CarbonRow processRow(CarbonRow row) {
-return encoder.convert(row);
+// Not implemented
+return null;
--- End diff --

throw UnsupportedOperationException here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-carbondata pull request #263: [CARBONDATA-2] Data load integration...

2016-11-08 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/263#discussion_r87122653
  
--- Diff: 
processing/src/main/java/org/apache/carbondata/processing/newflow/parser/impl/RowParserImpl.java
 ---
@@ -18,22 +18,80 @@
  */
 package org.apache.carbondata.processing.newflow.parser.impl;
 
+import 
org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.newflow.DataField;
+import 
org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants;
+import org.apache.carbondata.processing.newflow.parser.CarbonParserFactory;
 import org.apache.carbondata.processing.newflow.parser.GenericParser;
 import org.apache.carbondata.processing.newflow.parser.RowParser;
 
 public class RowParserImpl implements RowParser {
 
   private GenericParser[] genericParsers;
 
-  public RowParserImpl(GenericParser[] genericParsers) {
-this.genericParsers = genericParsers;
+  private int[] outputMapping;
+
+  private int[] inputMapping;
+
+  private int numberOfColumns;
+
+  public RowParserImpl(DataField[] output, CarbonDataLoadConfiguration 
configuration) {
+String[] complexDelimiters =
+(String[]) 
configuration.getDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS);
+String nullFormat =
+
configuration.getDataLoadProperty(DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT)
+.toString();
+DataField[] input = getInput(configuration);
+genericParsers = new GenericParser[input.length];
+for (int i = 0; i < genericParsers.length; i++) {
+  genericParsers[i] =
+  CarbonParserFactory.createParser(input[i].getColumn(), 
complexDelimiters, nullFormat);
+}
+outputMapping = new int[output.length];
+for (int i = 0; i < input.length; i++) {
+  for (int j = 0; j < output.length; j++) {
+if (input[i].getColumn().equals(output[j].getColumn())) {
+  outputMapping[i] = j;
+  break;
+}
+  }
+}
+  }
+
+  public DataField[] getInput(CarbonDataLoadConfiguration configuration) {
+DataField[] fields = configuration.getDataFields();
+String[] header = configuration.getHeader();
+numberOfColumns = header.length;
+DataField[] input = new DataField[fields.length];
+inputMapping = new int[input.length];
+int k = 0;
+for (int i = 0; i < numberOfColumns; i++) {
+  for (int j = 0; j < fields.length; j++) {
+if 
(header[i].equalsIgnoreCase(fields[j].getColumn().getColName())) {
+  input[k] = fields[j];
+  inputMapping[k] = i;
+  k++;
+  break;
+}
+  }
+}
+return input;
   }
 
   @Override
   public Object[] parseRow(Object[] row) {
-for (int i = 0; i < row.length; i++) {
-  row[i] = genericParsers[i].parse(row[i].toString());
+// If number of columns are less in a row then create new array with 
same size of he
--- End diff --

last word of this sentence is not correct


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-carbondata pull request #263: [CARBONDATA-2] Data load integration...

2016-11-08 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/263#discussion_r87122331
  
--- Diff: 
processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
 ---
@@ -43,35 +45,58 @@
 
   private CarbonDataLoadConfiguration configuration;
 
+  private DataField[] fields;
+
   private FieldConverter[] fieldConverters;
 
-  public RowConverterImpl(DataField[] fields, CarbonDataLoadConfiguration 
configuration) {
+  private BadRecordslogger badRecordLogger;
+
+  private BadRecordLogHolder logHolder;
+
+  public RowConverterImpl(DataField[] fields, CarbonDataLoadConfiguration 
configuration,
+  BadRecordslogger badRecordLogger) {
+this.fields = fields;
 this.configuration = configuration;
+this.badRecordLogger = badRecordLogger;
+  }
+
+  @Override
+  public void initialize() {
 CacheProvider cacheProvider = CacheProvider.getInstance();
 Cache cache =
 cacheProvider.createCache(CacheType.REVERSE_DICTIONARY,
 configuration.getTableIdentifier().getStorePath());
+String nullFormat =
+
configuration.getDataLoadProperty(DataLoadProcessorConstants.SERIALIZATION_NULL_FORMAT)
+.toString();
 List fieldConverterList = new ArrayList<>();
 
 long lruCacheStartTime = System.currentTimeMillis();
 
 for (int i = 0; i < fields.length; i++) {
   FieldConverter fieldConverter = FieldEncoderFactory.getInstance()
   .createFieldEncoder(fields[i], cache,
-  
configuration.getTableIdentifier().getCarbonTableIdentifier(), i);
-  if (fieldConverter != null) {
-fieldConverterList.add(fieldConverter);
-  }
+  
configuration.getTableIdentifier().getCarbonTableIdentifier(), i, nullFormat);
+  fieldConverterList.add(fieldConverter);
 }
 CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
 .recordLruCacheLoadTime((System.currentTimeMillis() - 
lruCacheStartTime) / 1000.0);
 fieldConverters = fieldConverterList.toArray(new 
FieldConverter[fieldConverterList.size()]);
+logHolder = new BadRecordLogHolder();
   }
 
   @Override
   public CarbonRow convert(CarbonRow row) throws 
CarbonDataLoadingException {
+CarbonRow copy = row.getCopy();
--- End diff --

Why copy it every time? Copy it only if it is bad record


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-carbondata pull request #263: [CARBONDATA-2] Data load integration...

2016-11-08 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/263#discussion_r87122165
  
--- Diff: 
processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
 ---
@@ -102,21 +102,14 @@ public void initialize(SortParameters sortParameters) 
{
 }
 this.executorService = Executors.newFixedThreadPool(iterators.length);
 
-// First prepare the data for sort.
-Iterator[] sortPrepIterators = new 
Iterator[iterators.length];
-for (int i = 0; i < sortPrepIterators.length; i++) {
-  sortPrepIterators[i] = new SortPreparatorIterator(iterators[i], 
inputDataFields);
-}
-
-for (int i = 0; i < sortDataRows.length; i++) {
-  executorService
-  .submit(new SortIteratorThread(sortPrepIterators[i], 
sortDataRows[i], sortParameters));
-}
-
 try {
+  for (int i = 0; i < sortDataRows.length; i++) {
+executorService
+.submit(new SortIteratorThread(iterators[i], sortDataRows[i], 
sortParameters));
--- End diff --

should not break the line at function call, move `submit` to previous line 
and bread at parameters


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-carbondata pull request #263: [CARBONDATA-2] Data load integration...

2016-11-08 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/263#discussion_r87120631
  
--- Diff: 
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
 ---
@@ -1112,24 +1085,27 @@ case class LoadTableUsingKettle(
   val dataLoadSchema = new CarbonDataLoadSchema(table)
   // Need to fill dimension relation
   carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
-  var storeLocation = ""
   val configuredStore = 
CarbonLoaderUtil.getConfiguredLocalDirs(SparkEnv.get.conf)
-  if (null != configuredStore && configuredStore.nonEmpty) {
-storeLocation = 
configuredStore(Random.nextInt(configuredStore.length))
-  }
-  if (storeLocation == null) {
-storeLocation = System.getProperty("java.io.tmpdir")
-  }
 
   var partitionLocation = relation.tableMeta.storePath + "/partition/" 
+
   
relation.tableMeta.carbonTableIdentifier.getDatabaseName + "/" +
   
relation.tableMeta.carbonTableIdentifier.getTableName + "/"
 
-  storeLocation = storeLocation + "/carbonstore/" + System.nanoTime()
 
   val columinar = sqlContext.getConf("carbon.is.columnar.storage", 
"true").toBoolean
   val kettleHomePath = CarbonScalaUtil.getKettleHome(sqlContext)
 
+  val useKettle = options.get("use_kettle") match {
+case Some(value) => value.toBoolean
+case _ =>
+  val useKettleLocal = System.getProperty("use.kettle")
+  if (useKettleLocal == null) {
+sqlContext.sparkContext.getConf.get("use_kettle_default", 
"true").toBoolean
--- End diff --

Seems not very good to embedded this option string in this class, can we 
move it to `CarbonOption`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-carbondata pull request #263: [CARBONDATA-2] Data load integration...

2016-11-08 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/263#discussion_r87117986
  
--- Diff: 
integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
 ---
@@ -215,6 +227,105 @@ public static void executeGraph(CarbonLoadModel 
loadModel, String storeLocation,
 info, loadModel.getPartitionId(), 
loadModel.getCarbonDataLoadSchema());
   }
 
+  public static void executeNewDataLoad(CarbonLoadModel loadModel, String 
storeLocation,
--- End diff --

Can we move this class or this function out of carbon-spark package? It is 
a utility function class for loading, should not depend on spark, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-carbondata pull request #263: [CARBONDATA-2] Data load integration...

2016-11-07 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/263#discussion_r86929386
  
--- Diff: 
integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 ---
@@ -761,6 +761,11 @@ object CarbonDataRDDFactory extends Logging {
 .audit("Data load request has been received for table " + 
carbonLoadModel
   .getDatabaseName + "." + carbonLoadModel.getTableName
 )
+  if (!useKettle) {
+logger.info("Data is loading with New Data Flow for table " + 
carbonLoadModel
--- End diff --

ok changed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-carbondata pull request #263: [CARBONDATA-2] Data load integration...

2016-11-06 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/263#discussion_r86684431
  
--- Diff: 
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
 ---
@@ -1198,10 +1172,16 @@ case class LoadTableUsingKettle(
 GlobalDictionaryUtil
   .generateGlobalDictionary(sqlContext, carbonLoadModel, 
relation.tableMeta.storePath,
 dataFrame)
-CarbonDataRDDFactory
-  .loadCarbonData(sqlContext, carbonLoadModel, storeLocation, 
relation.tableMeta.storePath,
+CarbonDataRDDFactory.loadCarbonData(sqlContext,
+carbonLoadModel,
+storeLocation,
+relation.tableMeta.storePath,
--- End diff --

Actually one is temporary location to store in the local disk of node. But 
it is not required as it got added later. So I am removing it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-carbondata pull request #263: [CARBONDATA-2] Data load integration...

2016-11-06 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/263#discussion_r86684268
  
--- Diff: 
integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
 ---
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import java.lang.Long
+import java.text.SimpleDateFormat
+import java.util
+import java.util.{Date, UUID}
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.io.NullWritable
+import org.apache.hadoop.mapreduce.RecordReader
+import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
+import org.apache.spark.mapred.{CarbonHadoopMapReduceUtil, 
CarbonSerializableConfiguration}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.execution.command.Partitioner
+import org.apache.spark.util.SerializableConfiguration
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.common.logging.impl.StandardLogService
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.load.{BlockDetails, LoadMetadataDetails}
+import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory
+import org.apache.carbondata.hadoop.csv.CSVInputFormat
+import org.apache.carbondata.hadoop.io.StringArrayWritable
+import org.apache.carbondata.processing.graphgenerator.GraphGenerator
+import org.apache.carbondata.spark.DataLoadResult
+import org.apache.carbondata.spark.load._
+import org.apache.carbondata.spark.splits.TableSplit
+import org.apache.carbondata.spark.util.CarbonQueryUtil
+
+/**
+ * It loads the data to carbon using @AbstractDataLoadProcessorStep
+ */
+class NewCarbonDataLoadRDD[K, V](
+sc: SparkContext,
+result: DataLoadResult[K, V],
+carbonLoadModel: CarbonLoadModel,
+var storeLocation: String,
+hdfsStoreLocation: String,
+kettleHomePath: String,
+partitioner: Partitioner,
+columinar: Boolean,
+loadCount: Integer,
+tableCreationTime: Long,
+schemaLastUpdatedTime: Long,
+blocksGroupBy: Array[(String, Array[BlockDetails])],
+isTableSplitPartition: Boolean)
+  extends RDD[(K, V)](sc, Nil) with CarbonHadoopMapReduceUtil with Logging 
{
+
+  sc.setLocalProperty("spark.scheduler.pool", "DDL")
+
+  private val jobTrackerId: String = {
+val formatter = new SimpleDateFormat("MMddHHmm")
+formatter.format(new Date())
+  }
+
+  // A Hadoop Configuration can be about 10 KB, which is pretty big, so 
broadcast it
+  private val confBroadcast =
+sc.broadcast(new 
CarbonSerializableConfiguration(sc.hadoopConfiguration))
+
+  override def getPartitions: Array[Partition] = {
+if (isTableSplitPartition) {
+  // for table split partition
+  var splits = Array[TableSplit]()
+
+  if (carbonLoadModel.isDirectLoad) {
+splits = 
CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath,
+  partitioner.nodeList, partitioner.partitionCount)
+  }
+  else {
+splits = 
CarbonQueryUtil.getTableSplits(carbonLoadModel.getDatabaseName,
+  carbonLoadModel.getTableName, null, partitioner)
+  }
+
+  splits.zipWithIndex.map { s =>
+// filter the same partition unique id, because only one will 
match, so get 0 element
+val blocksDetails: Array[BlockDetails] = blocksGroupBy.filter(p =>
+  p._1 == s._1.getPartition.getUniqueID)(0)._2
+new CarbonTableSplitPartition(id, s._2, s._1, blocksDetails)
+  }
+} else {
+  // for node partition
+  blocksGroupBy.zipWithIndex.map { b =>
+new CarbonNodePartition(id, b._2, b._1._1, b._1._2)
+  }
+}
+  }
+
+  ov

[GitHub] incubator-carbondata pull request #263: [CARBONDATA-2] Data load integration...

2016-11-06 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/263#discussion_r86684118
  
--- Diff: 
integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
 ---
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import java.lang.Long
+import java.text.SimpleDateFormat
+import java.util
+import java.util.{Date, UUID}
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.io.NullWritable
+import org.apache.hadoop.mapreduce.RecordReader
+import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
+import org.apache.spark.mapred.{CarbonHadoopMapReduceUtil, 
CarbonSerializableConfiguration}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.execution.command.Partitioner
+import org.apache.spark.util.SerializableConfiguration
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.common.logging.impl.StandardLogService
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.load.{BlockDetails, LoadMetadataDetails}
+import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory
+import org.apache.carbondata.hadoop.csv.CSVInputFormat
+import org.apache.carbondata.hadoop.io.StringArrayWritable
+import org.apache.carbondata.processing.graphgenerator.GraphGenerator
+import org.apache.carbondata.spark.DataLoadResult
+import org.apache.carbondata.spark.load._
+import org.apache.carbondata.spark.splits.TableSplit
+import org.apache.carbondata.spark.util.CarbonQueryUtil
+
+/**
+ * It loads the data to carbon using @AbstractDataLoadProcessorStep
+ */
+class NewCarbonDataLoadRDD[K, V](
+sc: SparkContext,
+result: DataLoadResult[K, V],
+carbonLoadModel: CarbonLoadModel,
+var storeLocation: String,
+hdfsStoreLocation: String,
+kettleHomePath: String,
+partitioner: Partitioner,
+columinar: Boolean,
+loadCount: Integer,
+tableCreationTime: Long,
+schemaLastUpdatedTime: Long,
+blocksGroupBy: Array[(String, Array[BlockDetails])],
+isTableSplitPartition: Boolean)
+  extends RDD[(K, V)](sc, Nil) with CarbonHadoopMapReduceUtil with Logging 
{
+
+  sc.setLocalProperty("spark.scheduler.pool", "DDL")
+
+  private val jobTrackerId: String = {
+val formatter = new SimpleDateFormat("MMddHHmm")
+formatter.format(new Date())
+  }
+
+  // A Hadoop Configuration can be about 10 KB, which is pretty big, so 
broadcast it
+  private val confBroadcast =
+sc.broadcast(new 
CarbonSerializableConfiguration(sc.hadoopConfiguration))
+
+  override def getPartitions: Array[Partition] = {
+if (isTableSplitPartition) {
+  // for table split partition
+  var splits = Array[TableSplit]()
+
+  if (carbonLoadModel.isDirectLoad) {
+splits = 
CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath,
+  partitioner.nodeList, partitioner.partitionCount)
+  }
+  else {
+splits = 
CarbonQueryUtil.getTableSplits(carbonLoadModel.getDatabaseName,
+  carbonLoadModel.getTableName, null, partitioner)
+  }
+
+  splits.zipWithIndex.map { s =>
+// filter the same partition unique id, because only one will 
match, so get 0 element
+val blocksDetails: Array[BlockDetails] = blocksGroupBy.filter(p =>
+  p._1 == s._1.getPartition.getUniqueID)(0)._2
+new CarbonTableSplitPartition(id, s._2, s._1, blocksDetails)
+  }
+} else {
+  // for node partition
+  blocksGroupBy.zipWithIndex.map { b =>
+new CarbonNodePartition(id, b._2, b._1._1, b._1._2)
+  }
+}
+  }
+
+  ov

[GitHub] incubator-carbondata pull request #263: [CARBONDATA-2] Data load integration...

2016-11-06 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/263#discussion_r86683755
  
--- Diff: 
processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessExecutor.java
 ---
@@ -0,0 +1,28 @@
+package org.apache.carbondata.processing.newflow;
+
+import java.util.Iterator;
+
+import 
org.apache.carbondata.processing.newflow.steps.DataConverterProcessorStepImpl;
+import 
org.apache.carbondata.processing.newflow.steps.DataWriterProcessorStepImpl;
+import 
org.apache.carbondata.processing.newflow.steps.InputProcessorStepImpl;
+import 
org.apache.carbondata.processing.newflow.steps.SortProcessorStepImpl;
+
+/**
+ * It executes the data load.
+ */
+public final class DataLoadProcessExecutor {
+
+  public void execute(CarbonDataLoadConfiguration configuration, 
Iterator[] iterators) {
--- End diff --

ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-carbondata pull request #263: [CARBONDATA-2] Data load integration...

2016-11-06 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/263#discussion_r86683750
  
--- Diff: 
integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
 ---
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import java.lang.Long
+import java.text.SimpleDateFormat
+import java.util
+import java.util.{Date, UUID}
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.io.NullWritable
+import org.apache.hadoop.mapreduce.RecordReader
+import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
+import org.apache.spark.mapred.{CarbonHadoopMapReduceUtil, 
CarbonSerializableConfiguration}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.execution.command.Partitioner
+import org.apache.spark.util.SerializableConfiguration
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.common.logging.impl.StandardLogService
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.load.{BlockDetails, LoadMetadataDetails}
+import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory
+import org.apache.carbondata.hadoop.csv.CSVInputFormat
+import org.apache.carbondata.hadoop.io.StringArrayWritable
+import org.apache.carbondata.processing.graphgenerator.GraphGenerator
+import org.apache.carbondata.spark.DataLoadResult
+import org.apache.carbondata.spark.load._
+import org.apache.carbondata.spark.splits.TableSplit
+import org.apache.carbondata.spark.util.CarbonQueryUtil
+
+/**
+ * It loads the data to carbon using @AbstractDataLoadProcessorStep
+ */
+class NewCarbonDataLoadRDD[K, V](
+sc: SparkContext,
+result: DataLoadResult[K, V],
+carbonLoadModel: CarbonLoadModel,
+var storeLocation: String,
+hdfsStoreLocation: String,
+kettleHomePath: String,
+partitioner: Partitioner,
+columinar: Boolean,
+loadCount: Integer,
+tableCreationTime: Long,
+schemaLastUpdatedTime: Long,
+blocksGroupBy: Array[(String, Array[BlockDetails])],
+isTableSplitPartition: Boolean)
+  extends RDD[(K, V)](sc, Nil) with CarbonHadoopMapReduceUtil with Logging 
{
+
+  sc.setLocalProperty("spark.scheduler.pool", "DDL")
+
+  private val jobTrackerId: String = {
+val formatter = new SimpleDateFormat("MMddHHmm")
+formatter.format(new Date())
+  }
+
+  // A Hadoop Configuration can be about 10 KB, which is pretty big, so 
broadcast it
+  private val confBroadcast =
+sc.broadcast(new 
CarbonSerializableConfiguration(sc.hadoopConfiguration))
+
+  override def getPartitions: Array[Partition] = {
+if (isTableSplitPartition) {
+  // for table split partition
+  var splits = Array[TableSplit]()
+
+  if (carbonLoadModel.isDirectLoad) {
+splits = 
CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath,
+  partitioner.nodeList, partitioner.partitionCount)
+  }
+  else {
--- End diff --

ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-carbondata pull request #263: [CARBONDATA-2] Data load integration...

2016-11-06 Thread ravipesala
Github user ravipesala commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/263#discussion_r86683747
  
--- Diff: 
integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
 ---
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import java.lang.Long
+import java.text.SimpleDateFormat
+import java.util
+import java.util.{Date, UUID}
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.io.NullWritable
+import org.apache.hadoop.mapreduce.RecordReader
+import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
+import org.apache.spark.mapred.{CarbonHadoopMapReduceUtil, 
CarbonSerializableConfiguration}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.execution.command.Partitioner
+import org.apache.spark.util.SerializableConfiguration
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.common.logging.impl.StandardLogService
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.load.{BlockDetails, LoadMetadataDetails}
+import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory
+import org.apache.carbondata.hadoop.csv.CSVInputFormat
+import org.apache.carbondata.hadoop.io.StringArrayWritable
+import org.apache.carbondata.processing.graphgenerator.GraphGenerator
+import org.apache.carbondata.spark.DataLoadResult
+import org.apache.carbondata.spark.load._
+import org.apache.carbondata.spark.splits.TableSplit
+import org.apache.carbondata.spark.util.CarbonQueryUtil
+
+/**
+ * It loads the data to carbon using @AbstractDataLoadProcessorStep
+ */
+class NewCarbonDataLoadRDD[K, V](
+sc: SparkContext,
+result: DataLoadResult[K, V],
+carbonLoadModel: CarbonLoadModel,
+var storeLocation: String,
+hdfsStoreLocation: String,
+kettleHomePath: String,
+partitioner: Partitioner,
+columinar: Boolean,
+loadCount: Integer,
+tableCreationTime: Long,
+schemaLastUpdatedTime: Long,
+blocksGroupBy: Array[(String, Array[BlockDetails])],
+isTableSplitPartition: Boolean)
+  extends RDD[(K, V)](sc, Nil) with CarbonHadoopMapReduceUtil with Logging 
{
+
+  sc.setLocalProperty("spark.scheduler.pool", "DDL")
+
+  private val jobTrackerId: String = {
+val formatter = new SimpleDateFormat("MMddHHmm")
+formatter.format(new Date())
+  }
+
+  // A Hadoop Configuration can be about 10 KB, which is pretty big, so 
broadcast it
+  private val confBroadcast =
+sc.broadcast(new 
CarbonSerializableConfiguration(sc.hadoopConfiguration))
+
+  override def getPartitions: Array[Partition] = {
+if (isTableSplitPartition) {
+  // for table split partition
+  var splits = Array[TableSplit]()
--- End diff --

ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-carbondata pull request #263: [CARBONDATA-2] Data load integration...

2016-11-05 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/263#discussion_r8739
  
--- Diff: 
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
 ---
@@ -1198,10 +1172,16 @@ case class LoadTableUsingKettle(
 GlobalDictionaryUtil
   .generateGlobalDictionary(sqlContext, carbonLoadModel, 
relation.tableMeta.storePath,
 dataFrame)
-CarbonDataRDDFactory
-  .loadCarbonData(sqlContext, carbonLoadModel, storeLocation, 
relation.tableMeta.storePath,
+CarbonDataRDDFactory.loadCarbonData(sqlContext,
+carbonLoadModel,
+storeLocation,
+relation.tableMeta.storePath,
--- End diff --

why there are two storeLocation?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-carbondata pull request #263: [CARBONDATA-2] Data load integration...

2016-11-05 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/263#discussion_r8652
  
--- Diff: 
integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
 ---
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import java.lang.Long
+import java.text.SimpleDateFormat
+import java.util
+import java.util.{Date, UUID}
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.io.NullWritable
+import org.apache.hadoop.mapreduce.RecordReader
+import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
+import org.apache.spark.mapred.{CarbonHadoopMapReduceUtil, 
CarbonSerializableConfiguration}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.execution.command.Partitioner
+import org.apache.spark.util.SerializableConfiguration
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.common.logging.impl.StandardLogService
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.load.{BlockDetails, LoadMetadataDetails}
+import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory
+import org.apache.carbondata.hadoop.csv.CSVInputFormat
+import org.apache.carbondata.hadoop.io.StringArrayWritable
+import org.apache.carbondata.processing.graphgenerator.GraphGenerator
+import org.apache.carbondata.spark.DataLoadResult
+import org.apache.carbondata.spark.load._
+import org.apache.carbondata.spark.splits.TableSplit
+import org.apache.carbondata.spark.util.CarbonQueryUtil
+
+/**
+ * It loads the data to carbon using @AbstractDataLoadProcessorStep
+ */
+class NewCarbonDataLoadRDD[K, V](
+sc: SparkContext,
+result: DataLoadResult[K, V],
+carbonLoadModel: CarbonLoadModel,
+var storeLocation: String,
+hdfsStoreLocation: String,
+kettleHomePath: String,
+partitioner: Partitioner,
+columinar: Boolean,
+loadCount: Integer,
+tableCreationTime: Long,
+schemaLastUpdatedTime: Long,
+blocksGroupBy: Array[(String, Array[BlockDetails])],
+isTableSplitPartition: Boolean)
+  extends RDD[(K, V)](sc, Nil) with CarbonHadoopMapReduceUtil with Logging 
{
+
+  sc.setLocalProperty("spark.scheduler.pool", "DDL")
+
+  private val jobTrackerId: String = {
+val formatter = new SimpleDateFormat("MMddHHmm")
+formatter.format(new Date())
+  }
+
+  // A Hadoop Configuration can be about 10 KB, which is pretty big, so 
broadcast it
+  private val confBroadcast =
+sc.broadcast(new 
CarbonSerializableConfiguration(sc.hadoopConfiguration))
+
+  override def getPartitions: Array[Partition] = {
+if (isTableSplitPartition) {
+  // for table split partition
+  var splits = Array[TableSplit]()
+
+  if (carbonLoadModel.isDirectLoad) {
+splits = 
CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath,
+  partitioner.nodeList, partitioner.partitionCount)
+  }
+  else {
+splits = 
CarbonQueryUtil.getTableSplits(carbonLoadModel.getDatabaseName,
+  carbonLoadModel.getTableName, null, partitioner)
+  }
+
+  splits.zipWithIndex.map { s =>
+// filter the same partition unique id, because only one will 
match, so get 0 element
+val blocksDetails: Array[BlockDetails] = blocksGroupBy.filter(p =>
+  p._1 == s._1.getPartition.getUniqueID)(0)._2
+new CarbonTableSplitPartition(id, s._2, s._1, blocksDetails)
+  }
+} else {
+  // for node partition
+  blocksGroupBy.zipWithIndex.map { b =>
+new CarbonNodePartition(id, b._2, b._1._1, b._1._2)
+  }
+}
+  }
+
+  overr

[GitHub] incubator-carbondata pull request #263: [CARBONDATA-2] Data load integration...

2016-11-05 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/263#discussion_r86665773
  
--- Diff: 
integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
 ---
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import java.lang.Long
+import java.text.SimpleDateFormat
+import java.util
+import java.util.{Date, UUID}
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.io.NullWritable
+import org.apache.hadoop.mapreduce.RecordReader
+import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
+import org.apache.spark.mapred.{CarbonHadoopMapReduceUtil, 
CarbonSerializableConfiguration}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.execution.command.Partitioner
+import org.apache.spark.util.SerializableConfiguration
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.common.logging.impl.StandardLogService
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.load.{BlockDetails, LoadMetadataDetails}
+import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory
+import org.apache.carbondata.hadoop.csv.CSVInputFormat
+import org.apache.carbondata.hadoop.io.StringArrayWritable
+import org.apache.carbondata.processing.graphgenerator.GraphGenerator
+import org.apache.carbondata.spark.DataLoadResult
+import org.apache.carbondata.spark.load._
+import org.apache.carbondata.spark.splits.TableSplit
+import org.apache.carbondata.spark.util.CarbonQueryUtil
+
+/**
+ * It loads the data to carbon using @AbstractDataLoadProcessorStep
+ */
+class NewCarbonDataLoadRDD[K, V](
+sc: SparkContext,
+result: DataLoadResult[K, V],
+carbonLoadModel: CarbonLoadModel,
+var storeLocation: String,
+hdfsStoreLocation: String,
+kettleHomePath: String,
+partitioner: Partitioner,
+columinar: Boolean,
+loadCount: Integer,
+tableCreationTime: Long,
+schemaLastUpdatedTime: Long,
+blocksGroupBy: Array[(String, Array[BlockDetails])],
+isTableSplitPartition: Boolean)
+  extends RDD[(K, V)](sc, Nil) with CarbonHadoopMapReduceUtil with Logging 
{
+
+  sc.setLocalProperty("spark.scheduler.pool", "DDL")
+
+  private val jobTrackerId: String = {
+val formatter = new SimpleDateFormat("MMddHHmm")
+formatter.format(new Date())
+  }
+
+  // A Hadoop Configuration can be about 10 KB, which is pretty big, so 
broadcast it
+  private val confBroadcast =
+sc.broadcast(new 
CarbonSerializableConfiguration(sc.hadoopConfiguration))
+
+  override def getPartitions: Array[Partition] = {
+if (isTableSplitPartition) {
+  // for table split partition
+  var splits = Array[TableSplit]()
+
+  if (carbonLoadModel.isDirectLoad) {
+splits = 
CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath,
+  partitioner.nodeList, partitioner.partitionCount)
+  }
+  else {
+splits = 
CarbonQueryUtil.getTableSplits(carbonLoadModel.getDatabaseName,
+  carbonLoadModel.getTableName, null, partitioner)
+  }
+
+  splits.zipWithIndex.map { s =>
+// filter the same partition unique id, because only one will 
match, so get 0 element
+val blocksDetails: Array[BlockDetails] = blocksGroupBy.filter(p =>
+  p._1 == s._1.getPartition.getUniqueID)(0)._2
+new CarbonTableSplitPartition(id, s._2, s._1, blocksDetails)
+  }
+} else {
+  // for node partition
+  blocksGroupBy.zipWithIndex.map { b =>
+new CarbonNodePartition(id, b._2, b._1._1, b._1._2)
+  }
+}
+  }
+
+  overr

[GitHub] incubator-carbondata pull request #263: [CARBONDATA-2] Data load integration...

2016-11-05 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/263#discussion_r86665514
  
--- Diff: 
integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
 ---
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import java.lang.Long
+import java.text.SimpleDateFormat
+import java.util
+import java.util.{Date, UUID}
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.io.NullWritable
+import org.apache.hadoop.mapreduce.RecordReader
+import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
+import org.apache.spark.mapred.{CarbonHadoopMapReduceUtil, 
CarbonSerializableConfiguration}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.execution.command.Partitioner
+import org.apache.spark.util.SerializableConfiguration
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.common.logging.impl.StandardLogService
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.load.{BlockDetails, LoadMetadataDetails}
+import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory
+import org.apache.carbondata.hadoop.csv.CSVInputFormat
+import org.apache.carbondata.hadoop.io.StringArrayWritable
+import org.apache.carbondata.processing.graphgenerator.GraphGenerator
+import org.apache.carbondata.spark.DataLoadResult
+import org.apache.carbondata.spark.load._
+import org.apache.carbondata.spark.splits.TableSplit
+import org.apache.carbondata.spark.util.CarbonQueryUtil
+
+/**
+ * It loads the data to carbon using @AbstractDataLoadProcessorStep
+ */
+class NewCarbonDataLoadRDD[K, V](
+sc: SparkContext,
+result: DataLoadResult[K, V],
+carbonLoadModel: CarbonLoadModel,
+var storeLocation: String,
+hdfsStoreLocation: String,
+kettleHomePath: String,
+partitioner: Partitioner,
+columinar: Boolean,
+loadCount: Integer,
+tableCreationTime: Long,
+schemaLastUpdatedTime: Long,
+blocksGroupBy: Array[(String, Array[BlockDetails])],
+isTableSplitPartition: Boolean)
+  extends RDD[(K, V)](sc, Nil) with CarbonHadoopMapReduceUtil with Logging 
{
+
+  sc.setLocalProperty("spark.scheduler.pool", "DDL")
+
+  private val jobTrackerId: String = {
+val formatter = new SimpleDateFormat("MMddHHmm")
+formatter.format(new Date())
+  }
+
+  // A Hadoop Configuration can be about 10 KB, which is pretty big, so 
broadcast it
+  private val confBroadcast =
+sc.broadcast(new 
CarbonSerializableConfiguration(sc.hadoopConfiguration))
+
+  override def getPartitions: Array[Partition] = {
+if (isTableSplitPartition) {
+  // for table split partition
+  var splits = Array[TableSplit]()
+
+  if (carbonLoadModel.isDirectLoad) {
+splits = 
CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath,
+  partitioner.nodeList, partitioner.partitionCount)
+  }
+  else {
+splits = 
CarbonQueryUtil.getTableSplits(carbonLoadModel.getDatabaseName,
+  carbonLoadModel.getTableName, null, partitioner)
+  }
+
+  splits.zipWithIndex.map { s =>
+// filter the same partition unique id, because only one will 
match, so get 0 element
+val blocksDetails: Array[BlockDetails] = blocksGroupBy.filter(p =>
+  p._1 == s._1.getPartition.getUniqueID)(0)._2
+new CarbonTableSplitPartition(id, s._2, s._1, blocksDetails)
+  }
+} else {
+  // for node partition
+  blocksGroupBy.zipWithIndex.map { b =>
+new CarbonNodePartition(id, b._2, b._1._1, b._1._2)
+  }
+}
+  }
+
+  overr

[GitHub] incubator-carbondata pull request #263: [CARBONDATA-2] Data load integration...

2016-11-05 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/263#discussion_r86664431
  
--- Diff: 
processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessExecutor.java
 ---
@@ -0,0 +1,28 @@
+package org.apache.carbondata.processing.newflow;
+
+import java.util.Iterator;
+
+import 
org.apache.carbondata.processing.newflow.steps.DataConverterProcessorStepImpl;
+import 
org.apache.carbondata.processing.newflow.steps.DataWriterProcessorStepImpl;
+import 
org.apache.carbondata.processing.newflow.steps.InputProcessorStepImpl;
+import 
org.apache.carbondata.processing.newflow.steps.SortProcessorStepImpl;
+
+/**
+ * It executes the data load.
+ */
+public final class DataLoadProcessExecutor {
+
+  public void execute(CarbonDataLoadConfiguration configuration, 
Iterator[] iterators) {
--- End diff --

This function includes creation of the pipeline, so suggest to create a 
factory for this pipeline. You can call it DataLoadProcessBuilder.build, then 
execute the pipeline in CarbonLoaderUtil


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-carbondata pull request #263: [CARBONDATA-2] Data load integration...

2016-11-05 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/263#discussion_r86664395
  
--- Diff: 
integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
 ---
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import java.lang.Long
+import java.text.SimpleDateFormat
+import java.util
+import java.util.{Date, UUID}
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.io.NullWritable
+import org.apache.hadoop.mapreduce.RecordReader
+import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
+import org.apache.spark.mapred.{CarbonHadoopMapReduceUtil, 
CarbonSerializableConfiguration}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.execution.command.Partitioner
+import org.apache.spark.util.SerializableConfiguration
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.common.logging.impl.StandardLogService
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.load.{BlockDetails, LoadMetadataDetails}
+import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory
+import org.apache.carbondata.hadoop.csv.CSVInputFormat
+import org.apache.carbondata.hadoop.io.StringArrayWritable
+import org.apache.carbondata.processing.graphgenerator.GraphGenerator
+import org.apache.carbondata.spark.DataLoadResult
+import org.apache.carbondata.spark.load._
+import org.apache.carbondata.spark.splits.TableSplit
+import org.apache.carbondata.spark.util.CarbonQueryUtil
+
+/**
+ * It loads the data to carbon using @AbstractDataLoadProcessorStep
+ */
+class NewCarbonDataLoadRDD[K, V](
+sc: SparkContext,
+result: DataLoadResult[K, V],
+carbonLoadModel: CarbonLoadModel,
+var storeLocation: String,
+hdfsStoreLocation: String,
+kettleHomePath: String,
+partitioner: Partitioner,
+columinar: Boolean,
+loadCount: Integer,
+tableCreationTime: Long,
+schemaLastUpdatedTime: Long,
+blocksGroupBy: Array[(String, Array[BlockDetails])],
+isTableSplitPartition: Boolean)
+  extends RDD[(K, V)](sc, Nil) with CarbonHadoopMapReduceUtil with Logging 
{
+
+  sc.setLocalProperty("spark.scheduler.pool", "DDL")
+
+  private val jobTrackerId: String = {
+val formatter = new SimpleDateFormat("MMddHHmm")
+formatter.format(new Date())
+  }
+
+  // A Hadoop Configuration can be about 10 KB, which is pretty big, so 
broadcast it
+  private val confBroadcast =
+sc.broadcast(new 
CarbonSerializableConfiguration(sc.hadoopConfiguration))
+
+  override def getPartitions: Array[Partition] = {
+if (isTableSplitPartition) {
+  // for table split partition
+  var splits = Array[TableSplit]()
+
+  if (carbonLoadModel.isDirectLoad) {
+splits = 
CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath,
+  partitioner.nodeList, partitioner.partitionCount)
+  }
+  else {
--- End diff --

move up one line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-carbondata pull request #263: [CARBONDATA-2] Data load integration...

2016-11-05 Thread jackylk
Github user jackylk commented on a diff in the pull request:

https://github.com/apache/incubator-carbondata/pull/263#discussion_r86664148
  
--- Diff: 
integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
 ---
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import java.lang.Long
+import java.text.SimpleDateFormat
+import java.util
+import java.util.{Date, UUID}
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.io.NullWritable
+import org.apache.hadoop.mapreduce.RecordReader
+import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
+import org.apache.spark.mapred.{CarbonHadoopMapReduceUtil, 
CarbonSerializableConfiguration}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.execution.command.Partitioner
+import org.apache.spark.util.SerializableConfiguration
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.common.logging.impl.StandardLogService
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.load.{BlockDetails, LoadMetadataDetails}
+import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory
+import org.apache.carbondata.hadoop.csv.CSVInputFormat
+import org.apache.carbondata.hadoop.io.StringArrayWritable
+import org.apache.carbondata.processing.graphgenerator.GraphGenerator
+import org.apache.carbondata.spark.DataLoadResult
+import org.apache.carbondata.spark.load._
+import org.apache.carbondata.spark.splits.TableSplit
+import org.apache.carbondata.spark.util.CarbonQueryUtil
+
+/**
+ * It loads the data to carbon using @AbstractDataLoadProcessorStep
+ */
+class NewCarbonDataLoadRDD[K, V](
+sc: SparkContext,
+result: DataLoadResult[K, V],
+carbonLoadModel: CarbonLoadModel,
+var storeLocation: String,
+hdfsStoreLocation: String,
+kettleHomePath: String,
+partitioner: Partitioner,
+columinar: Boolean,
+loadCount: Integer,
+tableCreationTime: Long,
+schemaLastUpdatedTime: Long,
+blocksGroupBy: Array[(String, Array[BlockDetails])],
+isTableSplitPartition: Boolean)
+  extends RDD[(K, V)](sc, Nil) with CarbonHadoopMapReduceUtil with Logging 
{
+
+  sc.setLocalProperty("spark.scheduler.pool", "DDL")
+
+  private val jobTrackerId: String = {
+val formatter = new SimpleDateFormat("MMddHHmm")
+formatter.format(new Date())
+  }
+
+  // A Hadoop Configuration can be about 10 KB, which is pretty big, so 
broadcast it
+  private val confBroadcast =
+sc.broadcast(new 
CarbonSerializableConfiguration(sc.hadoopConfiguration))
+
+  override def getPartitions: Array[Partition] = {
+if (isTableSplitPartition) {
+  // for table split partition
+  var splits = Array[TableSplit]()
--- End diff --

unnecessary initialization


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---