Repository: incubator-carbondata Updated Branches: refs/heads/master cb480e0dd -> eac728d11
Added interface for carbon data loading Updated as per review comments Updated interface as per review comments Added carbon row. Added carbon row batch. Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/779fd08e Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/779fd08e Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/779fd08e Branch: refs/heads/master Commit: 779fd08e0577287d577f316d1214de1f1b7687be Parents: cb480e0 Author: ravipesala <ravi.pes...@gmail.com> Authored: Tue Oct 11 21:33:48 2016 +0530 Committer: jackylk <jacky.li...@huawei.com> Committed: Fri Oct 14 17:22:04 2016 +0800 ---------------------------------------------------------------------- .../newflow/AbstractDataLoadProcessorStep.java | 124 ++++++++++++++++ .../newflow/CarbonDataLoadConfiguration.java | 144 +++++++++++++++++++ .../processing/newflow/DataField.java | 57 ++++++++ .../constants/DataLoadProcessorConstants.java | 36 +++++ .../exception/CarbonDataLoadingException.java | 88 ++++++++++++ .../processing/newflow/row/CarbonRow.java | 71 +++++++++ .../processing/newflow/row/CarbonRowBatch.java | 42 ++++++ 7 files changed, 562 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/779fd08e/processing/src/main/java/org/apache/carbondata/processing/newflow/AbstractDataLoadProcessorStep.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/AbstractDataLoadProcessorStep.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/AbstractDataLoadProcessorStep.java new file mode 100644 index 0000000..69fe511 --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/AbstractDataLoadProcessorStep.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.carbondata.processing.newflow; + +import java.util.Iterator; + +import org.apache.carbondata.common.CarbonIterator; +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException; +import org.apache.carbondata.processing.newflow.row.CarbonRow; +import org.apache.carbondata.processing.newflow.row.CarbonRowBatch; + +/** + * This base abstract class for data loading. + * It can do transformation jobs as per the implementation. + */ +public abstract class AbstractDataLoadProcessorStep { + + protected CarbonDataLoadConfiguration configuration; + + protected AbstractDataLoadProcessorStep child; + + public AbstractDataLoadProcessorStep(CarbonDataLoadConfiguration configuration, + AbstractDataLoadProcessorStep child) { + this.configuration = configuration; + this.child = child; + } + + /** + * The output meta for this step. The data returns from this step is as per this meta. + * + */ + public abstract DataField[] getOutput(); + + /** + * Intialization process for this step. + * + * @throws CarbonDataLoadingException + */ + public abstract void intialize() throws CarbonDataLoadingException; + + /** + * Tranform the data as per the implementation. + * + * @return Array of Iterator with data. It can be processed parallel if implementation class wants + * @throws CarbonDataLoadingException + */ + public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException { + Iterator<CarbonRowBatch>[] childIters = child.execute(); + Iterator<CarbonRowBatch>[] iterators = new Iterator[childIters.length]; + for (int i = 0; i < childIters.length; i++) { + iterators[i] = getIterator(childIters[i]); + } + return iterators; + } + + /** + * Create the iterator using child iterator. + * + * @param childIter + * @return new iterator with step specific processing. + */ + protected Iterator<CarbonRowBatch> getIterator(final Iterator<CarbonRowBatch> childIter) { + return new CarbonIterator<CarbonRowBatch>() { + @Override public boolean hasNext() { + return childIter.hasNext(); + } + + @Override public CarbonRowBatch next() { + return processRowBatch(childIter.next()); + } + }; + } + + /** + * Process the batch of rows as per the step logic. + * + * @param rowBatch + * @return processed row. + */ + protected CarbonRowBatch processRowBatch(CarbonRowBatch rowBatch) { + CarbonRowBatch newBatch = new CarbonRowBatch(); + Iterator<CarbonRow> batchIterator = rowBatch.getBatchIterator(); + while (batchIterator.hasNext()) { + newBatch.addRow(processRow(batchIterator.next())); + } + return newBatch; + } + + /** + * Process the row as per the step logic. + * + * @param row + * @return processed row. + */ + protected abstract CarbonRow processRow(CarbonRow row); + + /** + * It is called when task is called successfully. + */ + public abstract void finish(); + + /** + * Closing of resources after step execution can be done here. + */ + public abstract void close(); + +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/779fd08e/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java new file mode 100644 index 0000000..20013ce --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.carbondata.processing.newflow; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier; + +public class CarbonDataLoadConfiguration { + + private DataField[] dataFields; + + private AbsoluteTableIdentifier tableIdentifier; + + private String[] header; + + private String partitionId; + + private String segmentId; + + private String taskNo; + + private Map<String, Object> dataLoadProperties = new HashMap<>(); + + public int getDimensionCount() { + int dimCount = 0; + for (int i = 0; i < dataFields.length; i++) { + if (dataFields[i].getColumn().isDimesion()) { + dimCount++; + } + } + return dimCount; + } + + public int getNoDictionaryCount() { + int dimCount = 0; + for (int i = 0; i < dataFields.length; i++) { + if (dataFields[i].getColumn().isDimesion() && !dataFields[i].hasDictionaryEncoding()) { + dimCount++; + } + } + return dimCount; + } + + public int getComplexDimensionCount() { + int dimCount = 0; + for (int i = 0; i < dataFields.length; i++) { + if (dataFields[i].getColumn().isComplex()) { + dimCount++; + } + } + return dimCount; + } + + public int getMeasureCount() { + int msrCount = 0; + for (int i = 0; i < dataFields.length; i++) { + if (!dataFields[i].getColumn().isDimesion()) { + msrCount++; + } + } + return msrCount; + } + + public DataField[] getDataFields() { + return dataFields; + } + + public void setDataFields(DataField[] dataFields) { + this.dataFields = dataFields; + } + + public String[] getHeader() { + return header; + } + + public void setHeader(String[] header) { + this.header = header; + } + + public AbsoluteTableIdentifier getTableIdentifier() { + return tableIdentifier; + } + + public void setTableIdentifier(AbsoluteTableIdentifier tableIdentifier) { + this.tableIdentifier = tableIdentifier; + } + + public String getPartitionId() { + return partitionId; + } + + public void setPartitionId(String partitionId) { + this.partitionId = partitionId; + } + + public String getSegmentId() { + return segmentId; + } + + public void setSegmentId(String segmentId) { + this.segmentId = segmentId; + } + + public String getTaskNo() { + return taskNo; + } + + public void setTaskNo(String taskNo) { + this.taskNo = taskNo; + } + + public void setDataLoadProperty(String key, Object value) { + dataLoadProperties.put(key, value); + } + + public Object getDataLoadProperty(String key, Object defaultValue) { + Object value = dataLoadProperties.get(key); + if (value == null) { + value = defaultValue; + } + return value; + } + + public Object getDataLoadProperty(String key) { + return dataLoadProperties.get(key); + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/779fd08e/processing/src/main/java/org/apache/carbondata/processing/newflow/DataField.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataField.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataField.java new file mode 100644 index 0000000..3e6d63e --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataField.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.carbondata.processing.newflow; + +import java.io.Serializable; + +import org.apache.carbondata.core.carbon.metadata.blocklet.compressor.CompressionCodec; +import org.apache.carbondata.core.carbon.metadata.encoder.Encoding; +import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonColumn; + +/** + * Metadata class for each column of table. + */ +public class DataField implements Serializable { + + private CarbonColumn column; + + private CompressionCodec compressionCodec; + + public boolean hasDictionaryEncoding() { + return column.hasEncoding(Encoding.DICTIONARY); + } + + public CarbonColumn getColumn() { + return column; + } + + public void setColumn(CarbonColumn column) { + this.column = column; + } + + public CompressionCodec getCompressionCodec() { + return compressionCodec; + } + + public void setCompressionCodec(CompressionCodec compressionCodec) { + this.compressionCodec = compressionCodec; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/779fd08e/processing/src/main/java/org/apache/carbondata/processing/newflow/constants/DataLoadProcessorConstants.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/constants/DataLoadProcessorConstants.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/constants/DataLoadProcessorConstants.java new file mode 100644 index 0000000..9d35ccb --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/constants/DataLoadProcessorConstants.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.carbondata.processing.newflow.constants; + +/** + * Constants used in data loading. + */ +public final class DataLoadProcessorConstants { + + public static final String TEMP_STORE_LOCATION = "TEMP_STORE_LOCATION"; + + public static final String BLOCKLET_SIZE = "BLOCKLET_SIZE"; + + public static final String SORT_SIZE = "SORT_SIZE"; + + public static final String FACT_TIME_STAMP = "FACT_TIME_STAMP"; + + public static final String COMPLEX_DELIMITERS = "COMPLEX_DELIMITERS"; + + public static final String DIMENSION_LENGTHS = "DIMENSION_LENGTHS"; +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/779fd08e/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/CarbonDataLoadingException.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/CarbonDataLoadingException.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/CarbonDataLoadingException.java new file mode 100644 index 0000000..c26e2de --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/exception/CarbonDataLoadingException.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.carbondata.processing.newflow.exception; + +import java.util.Locale; + +public class CarbonDataLoadingException extends Exception { + /** + * default serial version ID. + */ + private static final long serialVersionUID = 1L; + + /** + * The Error message. + */ + private String msg = ""; + + /** + * Constructor + * + * @param msg The error message for this exception. + */ + public CarbonDataLoadingException(String msg) { + super(msg); + this.msg = msg; + } + + /** + * Constructor + * + * @param msg The error message for this exception. + */ + public CarbonDataLoadingException(String msg, Throwable t) { + super(msg, t); + this.msg = msg; + } + + /** + * Constructor + * + * @param t + */ + public CarbonDataLoadingException(Throwable t) { + super(t); + } + + /** + * This method is used to get the localized message. + * + * @param locale - A Locale object represents a specific geographical, + * political, or cultural region. + * @return - Localized error message. + */ + public String getLocalizedMessage(Locale locale) { + return ""; + } + + /** + * getLocalizedMessage + */ + @Override public String getLocalizedMessage() { + return super.getLocalizedMessage(); + } + + /** + * getMessage + */ + public String getMessage() { + return this.msg; + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/779fd08e/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRow.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRow.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRow.java new file mode 100644 index 0000000..e1aa601 --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRow.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.carbondata.processing.newflow.row; + +import java.math.BigDecimal; + +/** + * This row class is used to transfer the row data from one step to other step + */ +public class CarbonRow { + + private Object[] data; + + public CarbonRow(Object[] data) { + this.data = data; + } + + public Object[] getData() { + return data; + } + + public int getInt(int ordinal) { + return (int) data[ordinal]; + } + + public long getLong(int ordinal) { + return (long) data[ordinal]; + } + + public float getFloat(int ordinal) { + return (float) data[ordinal]; + } + + public double getDouble(int ordinal) { + return (double) data[ordinal]; + } + + public BigDecimal getDecimal(int ordinal) { + return (BigDecimal) data[ordinal]; + } + + public String getString(int ordinal) { + return (String) data[ordinal]; + } + + public byte[] getBinary(int ordinal) { + return (byte[]) data[ordinal]; + } + + public void update(Object value, int ordinal) { + data[ordinal] = value; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/779fd08e/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRowBatch.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRowBatch.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRowBatch.java new file mode 100644 index 0000000..b26a60e --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRowBatch.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.carbondata.processing.newflow.row; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +/** + * Batch of rows. + */ +public class CarbonRowBatch { + + private List<CarbonRow> rowBatch = new ArrayList<>(); + + public void addRow(CarbonRow carbonRow) { + rowBatch.add(carbonRow); + } + + public Iterator<CarbonRow> getBatchIterator() { + return rowBatch.iterator(); + } + + +}