[
https://issues.apache.org/jira/browse/CARBONDATA-297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15574804#comment-15574804
]
ASF GitHub Bot commented on CARBONDATA-297:
---
Github user asfgit closed the pull request at:
https://github.com/apache/incubator-carbondata/pull/229
> 2. Add interfaces for data loading.
> ---
>
> Key: CARBONDATA-297
> URL: https://issues.apache.org/jira/browse/CARBONDATA-297
> Project: CarbonData
> Issue Type: Sub-task
>Reporter: Ravindra Pesala
>Assignee: Ravindra Pesala
> Fix For: 0.2.0-incubating
>
>
> Add the major interface classes for data loading so that the following jiras
> can use this interfaces to implement it.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/CARBONDATA-297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15574598#comment-15574598
]
ASF GitHub Bot commented on CARBONDATA-297:
---
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/229#discussion_r83373827
--- Diff:
processing/src/main/java/org/apache/carbondata/processing/newflow/AbstractDataLoadProcessorStep.java
---
@@ -73,15 +72,15 @@ public
AbstractDataLoadProcessorStep(CarbonDataLoadConfiguration configuration,
* Create the iterator using child iterator.
*
* @param childIter
- * @return
+ * @return new iterator with step specific processing.
*/
- protected Iterator
[
https://issues.apache.org/jira/browse/CARBONDATA-297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15574308#comment-15574308
]
ASF GitHub Bot commented on CARBONDATA-297:
---
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/229#discussion_r83354231
--- Diff:
processing/src/main/java/org/apache/carbondata/processing/newflow/AbstractDataLoadProcessorStep.java
---
@@ -73,15 +72,15 @@ public
AbstractDataLoadProcessorStep(CarbonDataLoadConfiguration configuration,
* Create the iterator using child iterator.
*
* @param childIter
- * @return
+ * @return new iterator with step specific processing.
*/
- protected Iterator getIterator(final Iterator
childIter) {
-return new CarbonIterator() {
+ protected Iterator getIterator(final Iterator
childIter) {
--- End diff --
In order to support batch conversion, it is better to put
`Iterator` instead
> 2. Add interfaces for data loading.
> ---
>
> Key: CARBONDATA-297
> URL: https://issues.apache.org/jira/browse/CARBONDATA-297
> Project: CarbonData
> Issue Type: Sub-task
>Reporter: Ravindra Pesala
>Assignee: Ravindra Pesala
> Fix For: 0.2.0-incubating
>
>
> Add the major interface classes for data loading so that the following jiras
> can use this interfaces to implement it.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/CARBONDATA-297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15571835#comment-15571835
]
ASF GitHub Bot commented on CARBONDATA-297:
---
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/229#discussion_r83208961
--- Diff:
processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessorStep.java
---
@@ -0,0 +1,40 @@
+package org.apache.carbondata.processing.newflow;
+
+import java.util.Iterator;
+
+import
org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
+
+/**
+ * This base interface for data loading. It can do transformation jobs as
per the implementation.
+ *
+ */
+public interface DataLoadProcessorStep {
+
+ /**
+ * The output meta for this step. The data returns from this step is as
per this meta.
+ * @return
+ */
+ DataField[] getOutput();
+
+ /**
+ * Intialization process for this step.
+ * @param configuration
+ * @param child
+ * @throws CarbonDataLoadingException
+ */
+ void intialize(CarbonDataLoadConfiguration configuration,
DataLoadProcessorStep child) throws
+ CarbonDataLoadingException;
+
+ /**
+ * Tranform the data as per the implemetation.
+ * @return Iterator of data
+ * @throws CarbonDataLoadingException
+ */
+ Iterator execute() throws CarbonDataLoadingException;
--- End diff --
I thought the SortStep is a singleton object within the executor, and if
there are only one executor in one datanode, then the SortStep is sorting the
data within datanode-scope, which is what we want. Synchronization means
SortStep is thread-safe, so that multiple task can insert row into it.
Does your desing look like this? Otherwise how you ensure data is sorting
within datanode?
> 2. Add interfaces for data loading.
> ---
>
> Key: CARBONDATA-297
> URL: https://issues.apache.org/jira/browse/CARBONDATA-297
> Project: CarbonData
> Issue Type: Sub-task
>Reporter: Ravindra Pesala
>Assignee: Ravindra Pesala
> Fix For: 0.2.0-incubating
>
>
> Add the major interface classes for data loading so that the following jiras
> can use this interfaces to implement it.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/CARBONDATA-297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15570931#comment-15570931
]
ASF GitHub Bot commented on CARBONDATA-297:
---
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/229#discussion_r83147351
--- Diff:
processing/src/main/java/org/apache/carbondata/processing/newflow/iterators/RecordReaderIterator.java
---
@@ -0,0 +1,40 @@
+package org.apache.carbondata.processing.newflow.iterators;
+
+import java.io.IOException;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+
+import org.apache.hadoop.mapred.RecordReader;
+
+/**
+ * This iterator iterates RecordReader.
+ */
+public class RecordReaderIterator extends CarbonIterator {
--- End diff --
It is used for iterating RecordReader. I can move it to carbon-hadoop
module but processing module need to be dependent on it. Already processing
module is dependent on hadoop module so it becomes dependent on each other.
> 2. Add interfaces for data loading.
> ---
>
> Key: CARBONDATA-297
> URL: https://issues.apache.org/jira/browse/CARBONDATA-297
> Project: CarbonData
> Issue Type: Sub-task
>Reporter: Ravindra Pesala
>Assignee: Ravindra Pesala
> Fix For: 0.2.0-incubating
>
>
> Add the major interface classes for data loading so that the following jiras
> can use this interfaces to implement it.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/CARBONDATA-297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15570458#comment-15570458
]
ASF GitHub Bot commented on CARBONDATA-297:
---
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/229#discussion_r83130319
--- Diff:
processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessorStep.java
---
@@ -0,0 +1,40 @@
+package org.apache.carbondata.processing.newflow;
+
+import java.util.Iterator;
+
+import
org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
+
+/**
+ * This base interface for data loading. It can do transformation jobs as
per the implementation.
+ *
+ */
+public interface DataLoadProcessorStep {
+
+ /**
+ * The output meta for this step. The data returns from this step is as
per this meta.
+ * @return
+ */
+ DataField[] getOutput();
+
+ /**
+ * Intialization process for this step.
+ * @param configuration
+ * @param child
+ * @throws CarbonDataLoadingException
+ */
+ void intialize(CarbonDataLoadConfiguration configuration,
DataLoadProcessorStep child) throws
+ CarbonDataLoadingException;
+
+ /**
+ * Tranform the data as per the implemetation.
+ * @return Iterator of data
+ * @throws CarbonDataLoadingException
+ */
+ Iterator execute() throws CarbonDataLoadingException;
+
+ /**
+ * Any closing of resources after step execution can be done here.
+ */
+ void finish();
--- End diff --
It should be called in both failure and success cases. So i will rename it
to `close`
> 2. Add interfaces for data loading.
> ---
>
> Key: CARBONDATA-297
> URL: https://issues.apache.org/jira/browse/CARBONDATA-297
> Project: CarbonData
> Issue Type: Sub-task
>Reporter: Ravindra Pesala
>Assignee: Ravindra Pesala
> Fix For: 0.2.0-incubating
>
>
> Add the major interface classes for data loading so that the following jiras
> can use this interfaces to implement it.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/CARBONDATA-297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15570455#comment-15570455
]
ASF GitHub Bot commented on CARBONDATA-297:
---
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/229#discussion_r83130123
--- Diff:
processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessorStep.java
---
@@ -0,0 +1,40 @@
+package org.apache.carbondata.processing.newflow;
+
+import java.util.Iterator;
+
+import
org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
+
+/**
+ * This base interface for data loading. It can do transformation jobs as
per the implementation.
+ *
+ */
+public interface DataLoadProcessorStep {
--- End diff --
ok
> 2. Add interfaces for data loading.
> ---
>
> Key: CARBONDATA-297
> URL: https://issues.apache.org/jira/browse/CARBONDATA-297
> Project: CarbonData
> Issue Type: Sub-task
>Reporter: Ravindra Pesala
>Assignee: Ravindra Pesala
> Fix For: 0.2.0-incubating
>
>
> Add the major interface classes for data loading so that the following jiras
> can use this interfaces to implement it.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/CARBONDATA-297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15569219#comment-15569219
]
ASF GitHub Bot commented on CARBONDATA-297:
---
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/229#discussion_r83049479
--- Diff:
processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessorStep.java
---
@@ -0,0 +1,40 @@
+package org.apache.carbondata.processing.newflow;
+
+import java.util.Iterator;
+
+import
org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
+
+/**
+ * This base interface for data loading. It can do transformation jobs as
per the implementation.
+ *
+ */
+public interface DataLoadProcessorStep {
+
+ /**
+ * The output meta for this step. The data returns from this step is as
per this meta.
+ * @return
+ */
+ DataField[] getOutput();
+
+ /**
+ * Intialization process for this step.
+ * @param configuration
+ * @param child
+ * @throws CarbonDataLoadingException
+ */
+ void intialize(CarbonDataLoadConfiguration configuration,
DataLoadProcessorStep child) throws
+ CarbonDataLoadingException;
+
+ /**
+ * Tranform the data as per the implemetation.
+ * @return Iterator of data
+ * @throws CarbonDataLoadingException
+ */
+ Iterator execute() throws CarbonDataLoadingException;
--- End diff --
For suppose if we are loading 50GB of csv files and each HDFS block size is
256MB then total number of partitions are 200. If we allow one task per
partition then it would be 200 tasks. In carbondata one btree is created for
each task. So if we allow all 200 tasks then it would be massively 200 btrees
and it is not effective both in performance and memory wise.
That is the reason why we pool multiple blocks per task in the current
kettle implementation. And these blocks are processed parallely. We can take
the same way and use iterator for each thread and returns array of iterator.
What do you mean by datanode-scope sorting? how to synchronize between
multiple tasks?
> 2. Add interfaces for data loading.
> ---
>
> Key: CARBONDATA-297
> URL: https://issues.apache.org/jira/browse/CARBONDATA-297
> Project: CarbonData
> Issue Type: Sub-task
>Reporter: Ravindra Pesala
>Assignee: Ravindra Pesala
> Fix For: 0.2.0-incubating
>
>
> Add the major interface classes for data loading so that the following jiras
> can use this interfaces to implement it.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/CARBONDATA-297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15569067#comment-15569067
]
ASF GitHub Bot commented on CARBONDATA-297:
---
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/229#discussion_r83032703
--- Diff:
processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessorStep.java
---
@@ -0,0 +1,40 @@
+package org.apache.carbondata.processing.newflow;
+
+import java.util.Iterator;
+
+import
org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
+
+/**
+ * This base interface for data loading. It can do transformation jobs as
per the implementation.
+ *
+ */
+public interface DataLoadProcessorStep {
+
+ /**
+ * The output meta for this step. The data returns from this step is as
per this meta.
+ * @return
+ */
+ DataField[] getOutput();
+
+ /**
+ * Intialization process for this step.
+ * @param configuration
+ * @param child
+ * @throws CarbonDataLoadingException
+ */
+ void intialize(CarbonDataLoadConfiguration configuration,
DataLoadProcessorStep child) throws
+ CarbonDataLoadingException;
+
+ /**
+ * Tranform the data as per the implemetation.
+ * @return Iterator of data
+ * @throws CarbonDataLoadingException
+ */
+ Iterator execute() throws CarbonDataLoadingException;
+
+ /**
+ * Any closing of resources after step execution can be done here.
+ */
+ void finish();
--- End diff --
This is called when the step successfully finished. But what about failure
case, should there be a
`void close();`
interface for failure case?
> 2. Add interfaces for data loading.
> ---
>
> Key: CARBONDATA-297
> URL: https://issues.apache.org/jira/browse/CARBONDATA-297
> Project: CarbonData
> Issue Type: Sub-task
>Reporter: Ravindra Pesala
>Assignee: Ravindra Pesala
> Fix For: 0.2.0-incubating
>
>
> Add the major interface classes for data loading so that the following jiras
> can use this interfaces to implement it.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/CARBONDATA-297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15569060#comment-15569060
]
ASF GitHub Bot commented on CARBONDATA-297:
---
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/229#discussion_r83031958
--- Diff:
processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessorStep.java
---
@@ -0,0 +1,40 @@
+package org.apache.carbondata.processing.newflow;
+
+import java.util.Iterator;
+
+import
org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
+
+/**
+ * This base interface for data loading. It can do transformation jobs as
per the implementation.
+ *
+ */
+public interface DataLoadProcessorStep {
--- End diff --
I think each implementation of this interface have similar logic in the
execute function, can we create a abstract class to implement the common logic?
The common logic like:
```
Iterator execute() throws CarbonDataLoadingException {
Iterator childIter = child.execute();
return new Iterator {
public boolean hasNext() {
return childIter.hasNext();
}
public Object[] next() {
// processInput is the abstract func in this class
return processInput(childItor.next());
}
}
}
```
> 2. Add interfaces for data loading.
> ---
>
> Key: CARBONDATA-297
> URL: https://issues.apache.org/jira/browse/CARBONDATA-297
> Project: CarbonData
> Issue Type: Sub-task
>Reporter: Ravindra Pesala
>Assignee: Ravindra Pesala
> Fix For: 0.2.0-incubating
>
>
> Add the major interface classes for data loading so that the following jiras
> can use this interfaces to implement it.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/CARBONDATA-297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15569063#comment-15569063
]
ASF GitHub Bot commented on CARBONDATA-297:
---
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/229#discussion_r83030022
--- Diff:
processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java
---
@@ -0,0 +1,185 @@
+package org.apache.carbondata.processing.newflow;
+
+import java.util.Iterator;
+
+import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
+
+public class CarbonDataLoadConfiguration {
--- End diff --
It seems this configuration is quite complex, I think it is because it
contains configuration for all steps.
Can we just have a simple `Map` as the configuration and let the `Step`
decide what to keep in it?
> 2. Add interfaces for data loading.
> ---
>
> Key: CARBONDATA-297
> URL: https://issues.apache.org/jira/browse/CARBONDATA-297
> Project: CarbonData
> Issue Type: Sub-task
>Reporter: Ravindra Pesala
>Assignee: Ravindra Pesala
> Fix For: 0.2.0-incubating
>
>
> Add the major interface classes for data loading so that the following jiras
> can use this interfaces to implement it.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/CARBONDATA-297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15569066#comment-15569066
]
ASF GitHub Bot commented on CARBONDATA-297:
---
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/229#discussion_r83033298
--- Diff:
processing/src/main/java/org/apache/carbondata/processing/newflow/iterators/RecordReaderIterator.java
---
@@ -0,0 +1,40 @@
+package org.apache.carbondata.processing.newflow.iterators;
+
+import java.io.IOException;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+
+import org.apache.hadoop.mapred.RecordReader;
+
+/**
+ * This iterator iterates RecordReader.
+ */
+public class RecordReaderIterator extends CarbonIterator {
--- End diff --
why this is carbon-processing but not carbon-hadoop module?
> 2. Add interfaces for data loading.
> ---
>
> Key: CARBONDATA-297
> URL: https://issues.apache.org/jira/browse/CARBONDATA-297
> Project: CarbonData
> Issue Type: Sub-task
>Reporter: Ravindra Pesala
>Assignee: Ravindra Pesala
> Fix For: 0.2.0-incubating
>
>
> Add the major interface classes for data loading so that the following jiras
> can use this interfaces to implement it.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
[
https://issues.apache.org/jira/browse/CARBONDATA-297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1556#comment-1556
]
ASF GitHub Bot commented on CARBONDATA-297:
---
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/incubator-carbondata/pull/229#discussion_r83018479
--- Diff:
processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessorStep.java
---
@@ -0,0 +1,40 @@
+package org.apache.carbondata.processing.newflow;
+
+import java.util.Iterator;
+
+import
org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
+
+/**
+ * This base interface for data loading. It can do transformation jobs as
per the implementation.
+ *
+ */
+public interface DataLoadProcessorStep {
+
+ /**
+ * The output meta for this step. The data returns from this step is as
per this meta.
+ * @return
+ */
+ DataField[] getOutput();
+
+ /**
+ * Intialization process for this step.
+ * @param configuration
+ * @param child
+ * @throws CarbonDataLoadingException
+ */
+ void intialize(CarbonDataLoadConfiguration configuration,
DataLoadProcessorStep child) throws
+ CarbonDataLoadingException;
+
+ /**
+ * Tranform the data as per the implemetation.
+ * @return Iterator of data
+ * @throws CarbonDataLoadingException
+ */
+ Iterator execute() throws CarbonDataLoadingException;
--- End diff --
I think `execute()` is called for every parallel unit of the input, right?
For example, when using spark to load from dataframe, `execute()` is called for
every spark partition (execute one task for one partition). When loading from
CSV HDFS file, `execute()` is called for every HDFS block. So I do not think
returning array of iterator is required.
The loading process of carbon in every executor, some of the step can be
parallelized, but sort step need to be synchronized (potential bottle net),
since we need datanode-scope sorting. Am I correct?
> 2. Add interfaces for data loading.
> ---
>
> Key: CARBONDATA-297
> URL: https://issues.apache.org/jira/browse/CARBONDATA-297
> Project: CarbonData
> Issue Type: Sub-task
>Reporter: Ravindra Pesala
>Assignee: Ravindra Pesala
> Fix For: 0.2.0-incubating
>
>
> Add the major interface classes for data loading so that the following jiras
> can use this interfaces to implement it.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)