http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/32bf2961/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataSplitManager.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataSplitManager.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataSplitManager.java deleted file mode 100755 index 692d69e..0000000 --- a/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataSplitManager.java +++ /dev/null @@ -1,300 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.facebook.presto.carbondata; - -import com.facebook.presto.carbondata.impl.CarbonLocalInputSplit; -import com.facebook.presto.carbondata.impl.CarbonTableCacheModel; -import com.facebook.presto.carbondata.impl.CarbonTableReader; -import com.facebook.presto.spi.*; -import com.facebook.presto.spi.connector.ConnectorSplitManager; -import com.facebook.presto.spi.connector.ConnectorTransactionHandle; -import com.facebook.presto.spi.predicate.Domain; -import com.facebook.presto.spi.predicate.Range; -import com.facebook.presto.spi.predicate.TupleDomain; -import com.facebook.presto.spi.type.*; -import com.google.common.collect.ImmutableList; -import io.airlift.slice.Slice; -import org.apache.carbondata.core.metadata.datatype.DataType; -import org.apache.carbondata.core.metadata.schema.table.CarbonTable; -import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; -import org.apache.carbondata.core.scan.expression.ColumnExpression; -import org.apache.carbondata.core.scan.expression.Expression; -import org.apache.carbondata.core.scan.expression.LiteralExpression; -import org.apache.carbondata.core.scan.expression.conditional.*; -import org.apache.carbondata.core.scan.expression.logical.AndExpression; -import org.apache.carbondata.core.scan.expression.logical.OrExpression; - -import javax.inject.Inject; -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; -import java.util.stream.Collectors; - -import static com.facebook.presto.carbondata.Types.checkType; -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkState; -import static java.util.Objects.requireNonNull; - -public class CarbondataSplitManager - implements ConnectorSplitManager -{ - - private final String connectorId; - private final CarbonTableReader carbonTableReader; - - @Inject - public CarbondataSplitManager(CarbondataConnectorId connectorId, CarbonTableReader reader) - { - this.connectorId = requireNonNull(connectorId, "connectorId is null").toString(); - this.carbonTableReader = requireNonNull(reader, "client is null"); - } - - public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableLayoutHandle layout) - { - CarbondataTableLayoutHandle layoutHandle = (CarbondataTableLayoutHandle)layout; - CarbondataTableHandle tableHandle = layoutHandle.getTable(); - SchemaTableName key = tableHandle.getSchemaTableName(); - - //get all filter domain - List<CarbondataColumnConstraint> rebuildConstraints = getColumnConstraints(layoutHandle.getConstraint()); - - CarbonTableCacheModel cache = carbonTableReader.getCarbonCache(key); - Expression filters = parseFilterExpression(layoutHandle.getConstraint(), cache.carbonTable); - - if(cache != null) { - try { - List<CarbonLocalInputSplit> splits = carbonTableReader.getInputSplits2(cache, filters); - - ImmutableList.Builder<ConnectorSplit> cSplits = ImmutableList.builder(); - for (CarbonLocalInputSplit split : splits) { - cSplits.add(new CarbondataSplit( - connectorId, - tableHandle.getSchemaTableName(), - layoutHandle.getConstraint(), - split, - rebuildConstraints - )); - } - return new FixedSplitSource(cSplits.build()); - } catch (Exception ex) { - System.out.println(ex.toString()); - } - } - return null; - } - - - public List<CarbondataColumnConstraint> getColumnConstraints(TupleDomain<ColumnHandle> constraint) - { - ImmutableList.Builder<CarbondataColumnConstraint> constraintBuilder = ImmutableList.builder(); - for (TupleDomain.ColumnDomain<ColumnHandle> columnDomain : constraint.getColumnDomains().get()) { - CarbondataColumnHandle columnHandle = checkType(columnDomain.getColumn(), CarbondataColumnHandle.class, "column handle"); - - constraintBuilder.add(new CarbondataColumnConstraint( - columnHandle.getColumnName(), - Optional.of(columnDomain.getDomain()), - columnHandle.isInvertedIndex())); - } - - return constraintBuilder.build(); - } - - - public Expression parseFilterExpression(TupleDomain<ColumnHandle> originalConstraint, CarbonTable carbonTable) - { - ImmutableList.Builder<Expression> filters = ImmutableList.builder(); - - Domain domain = null; - - for (ColumnHandle c : originalConstraint.getDomains().get().keySet()) { - - CarbondataColumnHandle cdch = (CarbondataColumnHandle) c; - Type type = cdch.getColumnType(); - - List<CarbonColumn> ccols = carbonTable.getCreateOrderColumn(carbonTable.getFactTableName()); - Optional<CarbonColumn> target = ccols.stream().filter(a -> a.getColName().equals(cdch.getColumnName())).findFirst(); - - if(target.get() == null) - return null; - - DataType coltype = target.get().getDataType(); - ColumnExpression colExpression = new ColumnExpression(cdch.getColumnName(), target.get().getDataType()); - //colExpression.setColIndex(cs.getSchemaOrdinal()); - colExpression.setDimension(target.get().isDimesion()); - colExpression.setDimension(carbonTable.getDimensionByName(carbonTable.getFactTableName(), cdch.getColumnName())); - colExpression.setCarbonColumn(target.get()); - - domain = originalConstraint.getDomains().get().get(c); - checkArgument(domain.getType().isOrderable(), "Domain type must be orderable"); - - if (domain.getValues().isNone()) { - //return QueryBuilders.filteredQuery(null, FilterBuilders.missingFilter(columnName)); - //return domain.isNullAllowed() ? columnName + " IS NULL" : "FALSE"; - //new Expression() - } - - if (domain.getValues().isAll()) { - //return QueryBuilders.filteredQuery(null, FilterBuilders.existsFilter(columnName)); - //return domain.isNullAllowed() ? "TRUE" : columnName + " IS NOT NULL"; - } - - List<Object> singleValues = new ArrayList<>(); - List<Expression> rangeFilter = new ArrayList<>(); - for (Range range : domain.getValues().getRanges().getOrderedRanges()) { - checkState(!range.isAll()); // Already checked - if (range.isSingleValue()) { - singleValues.add(range.getLow().getValue()); - } - else - { - List<String> rangeConjuncts = new ArrayList<>(); - if (!range.getLow().isLowerUnbounded()) { - Object value = ConvertDataByType(range.getLow().getValue(), type); - switch (range.getLow().getBound()) { - case ABOVE: - if (type == TimestampType.TIMESTAMP) { - //todo not now - } else { - GreaterThanExpression greater = new GreaterThanExpression(colExpression, new LiteralExpression(value, coltype)); - //greater.setRangeExpression(true); - rangeFilter.add(greater); - } - break; - case EXACTLY: - GreaterThanEqualToExpression greater = new GreaterThanEqualToExpression(colExpression, new LiteralExpression(value, coltype)); - //greater.setRangeExpression(true); - rangeFilter.add(greater); - break; - case BELOW: - throw new IllegalArgumentException("Low marker should never use BELOW bound"); - default: - throw new AssertionError("Unhandled bound: " + range.getLow().getBound()); - } - } - if (!range.getHigh().isUpperUnbounded()) { - Object value = ConvertDataByType(range.getHigh().getValue(), type); - switch (range.getHigh().getBound()) { - case ABOVE: - throw new IllegalArgumentException("High marker should never use ABOVE bound"); - case EXACTLY: - LessThanEqualToExpression less = new LessThanEqualToExpression(colExpression, new LiteralExpression(value, coltype)); - //less.setRangeExpression(true); - rangeFilter.add(less); - break; - case BELOW: - LessThanExpression less2 = new LessThanExpression(colExpression, new LiteralExpression(value, coltype)); - //less2.setRangeExpression(true); - rangeFilter.add(less2); - break; - default: - throw new AssertionError("Unhandled bound: " + range.getHigh().getBound()); - } - } - } - } - - if (singleValues.size() == 1) { - Expression ex = null; - if (coltype.equals(DataType.STRING)) { - ex = new EqualToExpression(colExpression, new LiteralExpression(((Slice) singleValues.get(0)).toStringUtf8(), coltype)); - } else - ex = new EqualToExpression(colExpression, new LiteralExpression(singleValues.get(0), coltype)); - filters.add(ex); - } - else if(singleValues.size() > 1) { - ListExpression candidates = null; - List<Expression> exs = singleValues.stream().map((a) -> - { - return new LiteralExpression(ConvertDataByType(a, type), coltype); - }).collect(Collectors.toList()); - candidates = new ListExpression(exs); - - if(candidates != null) - filters.add(new InExpression(colExpression, candidates)); - } - else if(rangeFilter.size() > 0){ - if(rangeFilter.size() > 1) { - Expression finalFilters = new OrExpression(rangeFilter.get(0), rangeFilter.get(1)); - if(rangeFilter.size() > 2) - { - for(int i = 2; i< rangeFilter.size(); i++) - { - filters.add(new AndExpression(finalFilters, rangeFilter.get(i))); - } - } - } - else if(rangeFilter.size() == 1)//only have one value - filters.add(rangeFilter.get(0)); - } - } - - Expression finalFilters; - List<Expression> tmp = filters.build(); - if(tmp.size() > 1) { - finalFilters = new AndExpression(tmp.get(0), tmp.get(1)); - if(tmp.size() > 2) - { - for(int i = 2; i< tmp.size(); i++) - { - finalFilters = new AndExpression(finalFilters, tmp.get(i)); - } - } - } - else if(tmp.size() == 1) - finalFilters = tmp.get(0); - else//no filter - return null; - - return finalFilters; - } - - public static DataType Spi2CarbondataTypeMapper(Type colType) - { - if(colType == BooleanType.BOOLEAN) - return DataType.BOOLEAN; - else if(colType == SmallintType.SMALLINT) - return DataType.SHORT; - else if(colType == IntegerType.INTEGER) - return DataType.INT; - else if(colType == BigintType.BIGINT) - return DataType.LONG; - else if(colType == DoubleType.DOUBLE) - return DataType.DOUBLE; - else if(colType == DecimalType.createDecimalType()) - return DataType.DECIMAL; - else if(colType == VarcharType.VARCHAR) - return DataType.STRING; - else if(colType == DateType.DATE) - return DataType.DATE; - else if(colType == TimestampType.TIMESTAMP) - return DataType.TIMESTAMP; - else - return DataType.STRING; - } - - - public Object ConvertDataByType(Object rawdata, Type type) - { - if(type.equals(IntegerType.INTEGER)) - return new Integer((rawdata.toString())); - else if(type.equals(BigintType.BIGINT)) - return (Long)rawdata; - else if(type.equals(VarcharType.VARCHAR)) - return ((Slice)rawdata).toStringUtf8(); - else if(type.equals(BooleanType.BOOLEAN)) - return (Boolean)(rawdata); - - return rawdata; - } -}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/32bf2961/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataTableHandle.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataTableHandle.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataTableHandle.java deleted file mode 100755 index 6b263e0..0000000 --- a/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataTableHandle.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.facebook.presto.carbondata; - -import com.facebook.presto.spi.ConnectorTableHandle; -import com.facebook.presto.spi.SchemaTableName; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.Joiner; - -import java.util.Objects; - -import static java.util.Locale.ENGLISH; -import static java.util.Objects.requireNonNull; - -public class CarbondataTableHandle - implements ConnectorTableHandle { - - private final String connectorId; - private final SchemaTableName schemaTableName; - - @JsonCreator - public CarbondataTableHandle( - @JsonProperty("connectorId") String connectorId, - @JsonProperty("schemaTableName") SchemaTableName schemaTableName) - { - this.connectorId = requireNonNull(connectorId.toLowerCase(ENGLISH), "connectorId is null"); - this.schemaTableName = schemaTableName; - } - - @JsonProperty - public String getConnectorId() - { - return connectorId; - } - - @JsonProperty - public SchemaTableName getSchemaTableName() - { - return schemaTableName; - } - - @Override - public int hashCode() - { - return Objects.hash(connectorId, schemaTableName); - } - - @Override - public boolean equals(Object obj) - { - if (this == obj) { - return true; - } - if ((obj == null) || (getClass() != obj.getClass())) { - return false; - } - - CarbondataTableHandle other = (CarbondataTableHandle) obj; - return Objects.equals(this.connectorId, other.connectorId) && this.schemaTableName.equals(other.getSchemaTableName()); - } - - @Override - public String toString() - { - return Joiner.on(":").join(connectorId, schemaTableName.toString()); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/32bf2961/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataTableLayoutHandle.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataTableLayoutHandle.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataTableLayoutHandle.java deleted file mode 100755 index 01434bd..0000000 --- a/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataTableLayoutHandle.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.facebook.presto.carbondata; - -import com.facebook.presto.spi.ColumnHandle; -import com.facebook.presto.spi.ConnectorTableLayoutHandle; -import com.facebook.presto.spi.predicate.TupleDomain; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; - -import java.util.Objects; - -import static com.google.common.base.MoreObjects.toStringHelper; -import static java.util.Objects.requireNonNull; - -public class CarbondataTableLayoutHandle - implements ConnectorTableLayoutHandle -{ - private final CarbondataTableHandle table; - private final TupleDomain<ColumnHandle> constraint; - - @JsonCreator - public CarbondataTableLayoutHandle(@JsonProperty("table") CarbondataTableHandle table, - @JsonProperty("constraint") TupleDomain<ColumnHandle> constraint) - { - this.table = requireNonNull(table, "table is null"); - this.constraint = requireNonNull(constraint, "constraint is null"); - } - - @JsonProperty - public CarbondataTableHandle getTable() - { - return table; - } - - @JsonProperty - public TupleDomain<ColumnHandle> getConstraint() - { - return constraint; - } - - @Override - public boolean equals(Object obj) - { - if (this == obj) { - return true; - } - - if (obj == null || getClass() != obj.getClass()) { - return false; - } - - CarbondataTableLayoutHandle other = (CarbondataTableLayoutHandle) obj; - return Objects.equals(table, other.table) - && Objects.equals(constraint, other.constraint); - } - - @Override - public int hashCode() - { - return Objects.hash(table, constraint); - } - - @Override - public String toString() - { - return toStringHelper(this) - .add("table", table) - .add("constraint", constraint) - .toString(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/32bf2961/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataTransactionHandle.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataTransactionHandle.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataTransactionHandle.java deleted file mode 100755 index a643a33..0000000 --- a/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataTransactionHandle.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.facebook.presto.carbondata; - -import com.facebook.presto.spi.connector.ConnectorTransactionHandle; - -public enum CarbondataTransactionHandle - implements ConnectorTransactionHandle -{ - INSTANCE -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/32bf2961/integration/presto/src/main/java/com/facebook/presto/carbondata/Types.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/Types.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/Types.java deleted file mode 100755 index 5212dad..0000000 --- a/integration/presto/src/main/java/com/facebook/presto/carbondata/Types.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.facebook.presto.carbondata; - -import java.util.Locale; - -import static com.google.common.base.Preconditions.checkArgument; -import static java.util.Objects.requireNonNull; - -public class Types { - private Types() {} - - public static <A, B extends A> B checkType(A value, Class<B> target, String name) - { - requireNonNull(value, String.format(Locale.ENGLISH, "%s is null", name)); - checkArgument(target.isInstance(value), - "%s must be of type %s, not %s", - name, - target.getName(), - value.getClass().getName()); - return target.cast(value); - } -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/32bf2961/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonLocalInputSplit.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonLocalInputSplit.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonLocalInputSplit.java deleted file mode 100755 index 6084022..0000000 --- a/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonLocalInputSplit.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.facebook.presto.carbondata.impl; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; - -import java.util.List; - -public class CarbonLocalInputSplit { - - private static final long serialVersionUID = 3520344046772190207L; - private String segmentId; - private String path; - private long start; - private long length; - private List<String> locations; - private short version; - /** - * Number of BlockLets in a block - */ - private int numberOfBlocklets = 0; - - - @JsonProperty - public short getVersion(){ - return version; - } - - @JsonProperty - public List<String> getLocations() { - return locations; - } - - @JsonProperty - public long getLength() { - return length; - } - - @JsonProperty - public long getStart() { - return start; - } - - @JsonProperty - public String getPath() { - return path; - } - - @JsonProperty - public String getSegmentId() { - return segmentId; - } - - @JsonProperty - public int getNumberOfBlocklets() { - return numberOfBlocklets; - } - - @JsonCreator - public CarbonLocalInputSplit(@JsonProperty("segmentId") String segmentId, - @JsonProperty("path") String path, - @JsonProperty("start") long start, - @JsonProperty("length") long length, - @JsonProperty("locations") List<String> locations, - @JsonProperty("numberOfBlocklets") int numberOfBlocklets/*, - @JsonProperty("tableBlockInfo") TableBlockInfo tableBlockInfo*/, - @JsonProperty("version") short version) { - this.path = path; - this.start = start; - this.length = length; - this.segmentId = segmentId; - this.locations = locations; - this.numberOfBlocklets = numberOfBlocklets; - //this.tableBlockInfo = tableBlockInfo; - this.version = version; - } -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/32bf2961/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonTableCacheModel.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonTableCacheModel.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonTableCacheModel.java deleted file mode 100755 index d47f2a5..0000000 --- a/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonTableCacheModel.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.facebook.presto.carbondata.impl; - -import org.apache.carbondata.core.metadata.CarbonTableIdentifier; -import org.apache.carbondata.core.metadata.schema.table.CarbonTable; -import org.apache.carbondata.core.metadata.schema.table.TableInfo; -import org.apache.carbondata.core.util.path.CarbonTablePath; - -public class CarbonTableCacheModel { - - public CarbonTableIdentifier carbonTableIdentifier; - public CarbonTablePath carbonTablePath; - - public TableInfo tableInfo; - public CarbonTable carbonTable; - public String[] segments; - - public boolean isValid() - { - if(carbonTable != null - && carbonTablePath != null - && carbonTableIdentifier != null) - return true; - else - return false; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/32bf2961/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonTableConfig.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonTableConfig.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonTableConfig.java deleted file mode 100755 index cd52b85..0000000 --- a/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonTableConfig.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.facebook.presto.carbondata.impl; - -import io.airlift.configuration.Config; - -import javax.validation.constraints.NotNull; - -public class CarbonTableConfig { - //read from config - private String dbPtah; - private String tablePath; - private String storePath; - - @NotNull - public String getDbPtah() { - return dbPtah; - } - - @Config("carbondata-store") - public CarbonTableConfig setDbPtah(String dbPtah) { - this.dbPtah = dbPtah; - return this; - } - - @NotNull - public String getTablePath() { - return tablePath; - } - - @Config("carbondata-store") - public CarbonTableConfig setTablePath(String tablePath) { - this.tablePath = tablePath; - return this; - } - - @NotNull - public String getStorePath() { - return storePath; - } - - @Config("carbondata-store") - public CarbonTableConfig setStorePath(String storePath) { - this.storePath = storePath; - return this; - } -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/32bf2961/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonTableReader.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonTableReader.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonTableReader.java deleted file mode 100755 index 40bb841..0000000 --- a/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonTableReader.java +++ /dev/null @@ -1,736 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.facebook.presto.carbondata.impl; - -import com.facebook.presto.spi.SchemaTableName; -import com.facebook.presto.spi.classloader.ThreadContextClassLoader; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import com.google.inject.Inject; -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.datastore.*; -import org.apache.carbondata.core.datastore.block.*; -import org.apache.carbondata.core.datastore.exception.IndexBuilderException; -import org.apache.carbondata.core.datastore.filesystem.CarbonFile; -import org.apache.carbondata.core.datastore.impl.FileFactory; -import org.apache.carbondata.core.datastore.impl.btree.BTreeDataRefNodeFinder; -import org.apache.carbondata.core.datastore.impl.btree.BlockBTreeLeafNode; -import org.apache.carbondata.core.keygenerator.KeyGenException; -import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; -import org.apache.carbondata.core.metadata.CarbonMetadata; -import org.apache.carbondata.core.metadata.CarbonTableIdentifier; -import org.apache.carbondata.core.metadata.ColumnarFormatVersion; -import org.apache.carbondata.core.metadata.converter.SchemaConverter; -import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl; -import org.apache.carbondata.core.metadata.schema.table.CarbonTable; -import org.apache.carbondata.core.metadata.schema.table.TableInfo; -import org.apache.carbondata.core.mutate.UpdateVO; -import org.apache.carbondata.core.reader.ThriftReader; -import org.apache.carbondata.core.scan.expression.Expression; -import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor; -import org.apache.carbondata.core.scan.filter.FilterUtil; -import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; -import org.apache.carbondata.core.service.impl.PathFactory; -import org.apache.carbondata.core.statusmanager.SegmentStatusManager; -import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager; -import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.core.util.path.CarbonTablePath; -import org.apache.carbondata.hadoop.CacheClient; -import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil; -import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.lib.input.FileSplit; -import org.apache.thrift.TBase; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.net.URI; -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.stream.Collectors; -import java.util.stream.Stream; -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -import static java.util.Objects.requireNonNull; - -public class CarbonTableReader { - //CarbonTableReader will be a facade of these utils - //[ - // 1:CarbonMetadata,(logic table) - // 2:FileFactory, (physic table file) - // 3:CarbonCommonFactory, (offer some ) - // 4:DictionaryFactory, (parse dictionary util) - //] - - private CarbonTableConfig config; - private List<SchemaTableName> tableList; - private CarbonFile dbStore; - private FileFactory.FileType fileType; - - private ConcurrentHashMap<SchemaTableName, CarbonTableCacheModel> cc;//as a cache for Carbon reader - - @Inject - public CarbonTableReader(CarbonTableConfig config){ - this.config = requireNonNull(config, "CarbonTableConfig is null"); - this.cc = new ConcurrentHashMap<>(); - } - - public CarbonTableCacheModel getCarbonCache(SchemaTableName table){ - if(!cc.containsKey(table))//for worker node to initalize carbon metastore - { - try(ThreadContextClassLoader ignored = new ThreadContextClassLoader(FileFactory.class.getClassLoader())) { - if(dbStore == null) { - fileType = FileFactory.getFileType(config.getStorePath()); - try{ - dbStore = FileFactory.getCarbonFile(config.getStorePath(), fileType); - }catch (Exception ex){ - throw new RuntimeException(ex); - } - } - } - updateSchemaTables(); - parseCarbonMetadata(table); - } - - if(cc.containsKey(table)) - return cc.get(table); - else - return null;//need to reload?*/ - } - - public List<String> getSchemaNames() { - return updateSchemaList(); - } - - //default PathFilter - private static final PathFilter DefaultFilter = new PathFilter() { - @Override - public boolean accept(Path path) { - return CarbonTablePath.isCarbonDataFile(path.getName()); - } - }; - - public boolean updateDbStore(){ - if(dbStore == null) { - fileType = FileFactory.getFileType(config.getStorePath()); - try{ - dbStore = FileFactory.getCarbonFile(config.getStorePath(), fileType); - }catch (Exception ex){ - throw new RuntimeException(ex); - } - } - return true; - } - - public List<String> updateSchemaList() { - updateDbStore(); - - if(dbStore != null){ - List<String> scs = Stream.of(dbStore.listFiles()).map(a -> a.getName()).collect(Collectors.toList()); - return scs; - } - else - return ImmutableList.of(); - } - - - public Set<String> getTableNames(String schema) { - requireNonNull(schema, "schema is null"); - return updateTableList(schema); - } - - public Set<String> updateTableList(String dbName){ - List<CarbonFile> schema = Stream.of(dbStore.listFiles()).filter(a -> dbName.equals(a.getName())).collect(Collectors.toList()); - if(schema.size() > 0) - { - return Stream.of((schema.get(0)).listFiles()).map(a -> a.getName()).collect(Collectors.toSet()); - } - else - return ImmutableSet.of(); - } - - public CarbonTable getTable(SchemaTableName schemaTableName) { - try { - updateSchemaTables(); - } catch (Exception e) { - throw new RuntimeException(e); - } - - requireNonNull(schemaTableName, "schemaTableName is null"); - CarbonTable table = loadTableMetadata(schemaTableName); - - return table; - } - - - public void updateSchemaTables() - { - //update logic determine later - if(dbStore == null) - { - updateSchemaList(); - } - - tableList = new LinkedList<>(); - for(CarbonFile db: dbStore.listFiles()) - { - if(!db.getName().endsWith(".mdt")) { - for (CarbonFile table : db.listFiles()) { - tableList.add(new SchemaTableName(db.getName(), table.getName())); - } - } - } - } - - private CarbonTable loadTableMetadata(SchemaTableName schemaTableName) - { - for (SchemaTableName table : tableList) { - if (!table.equals(schemaTableName)) - continue; - - return parseCarbonMetadata(table); - } - return null; - } - - /** - * parse carbon metadata into cc(CarbonTableReader cache) - **/ - public CarbonTable parseCarbonMetadata(SchemaTableName table) - { - CarbonTable result = null; - try { - //è¿ä¸ªåºè¯¥æ¾å¨StoreFactory - CarbonTableCacheModel cache = cc.getOrDefault(table, new CarbonTableCacheModel()); - if(cache.isValid()) - return cache.carbonTable; - - //Step1: get table meta path, load carbon table param - String storePath = config.getStorePath(); - cache.carbonTableIdentifier = new CarbonTableIdentifier(table.getSchemaName(), table.getTableName(), UUID.randomUUID().toString()); - cache.carbonTablePath = PathFactory.getInstance().getCarbonTablePath(storePath, cache.carbonTableIdentifier); - cc.put(table, cache); - - //Step2: check file existed? read schema file - ThriftReader.TBaseCreator createTBase = new ThriftReader.TBaseCreator() { - public TBase create() { - return new org.apache.carbondata.format.TableInfo(); - } - }; - ThriftReader thriftReader = - new ThriftReader(cache.carbonTablePath.getSchemaFilePath(), createTBase); - thriftReader.open(); - org.apache.carbondata.format.TableInfo tableInfo = - (org.apache.carbondata.format.TableInfo) thriftReader.read(); - thriftReader.close(); - - //Format LevelçTableInfoï¼ éè¦è½¬æ¢æCode LevelçTableInfo - SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl(); - TableInfo wrapperTableInfo = schemaConverter - .fromExternalToWrapperTableInfo(tableInfo, table.getSchemaName(), table.getTableName(), - storePath); - wrapperTableInfo.setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(cache.carbonTablePath.getSchemaFilePath())); - //å è½½å°CarbonMetadataä»åº - CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo); - - cache.tableInfo = wrapperTableInfo; - cache.carbonTable = CarbonMetadata.getInstance().getCarbonTable(cache.carbonTableIdentifier.getTableUniqueName()); - result = cache.carbonTable; - }catch (Exception ex) { - throw new RuntimeException(ex); - } - - return result; - } - - public List<CarbonLocalInputSplit> getInputSplits2(CarbonTableCacheModel tableCacheModel, Expression filters) throws Exception { - - //å¤çfilter, ä¸æ¨filterï¼å°åºç¨å¨Segmentçç´¢å¼ä¸ - FilterExpressionProcessor filterExpressionProcessor = new FilterExpressionProcessor(); - - AbsoluteTableIdentifier absoluteTableIdentifier = tableCacheModel.carbonTable.getAbsoluteTableIdentifier(); - CacheClient cacheClient = new CacheClient(absoluteTableIdentifier.getStorePath()); - List<String> invalidSegments = new ArrayList<>(); - List<UpdateVO> invalidTimestampsList = new ArrayList<>(); - - // get all valid segments and set them into the configuration - SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(absoluteTableIdentifier); - SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier); - SegmentStatusManager.ValidAndInvalidSegmentsInfo segments = segmentStatusManager.getValidAndInvalidSegments(); - - tableCacheModel.segments = segments.getValidSegments().toArray(new String[0]); - if (segments.getValidSegments().size() == 0) { - return new ArrayList<>(0); - } - - // remove entry in the segment index if there are invalid segments - invalidSegments.addAll(segments.getInvalidSegments()); - for (String invalidSegmentId : invalidSegments) { - invalidTimestampsList.add(updateStatusManager.getInvalidTimestampRange(invalidSegmentId)); - } - if (invalidSegments.size() > 0) { - List<TableSegmentUniqueIdentifier> invalidSegmentsIds = new ArrayList<>(invalidSegments.size()); - for(String segId: invalidSegments) { - invalidSegmentsIds.add(new TableSegmentUniqueIdentifier(absoluteTableIdentifier, segId)); - } - cacheClient.getSegmentAccessClient().invalidateAll(invalidSegmentsIds); - } - - // get filter for segment - CarbonInputFormatUtil.processFilterExpression(filters, tableCacheModel.carbonTable); - FilterResolverIntf filterInterface = CarbonInputFormatUtil.resolveFilter(filters, tableCacheModel.carbonTable.getAbsoluteTableIdentifier()); - - List<CarbonLocalInputSplit> result = new ArrayList<>(); - //for each segment fetch blocks matching filter in Driver BTree - for (String segmentNo : tableCacheModel.segments) { - try{ - List<DataRefNode> dataRefNodes = getDataBlocksOfSegment(filterExpressionProcessor, absoluteTableIdentifier,tableCacheModel.carbonTablePath, filterInterface, segmentNo, cacheClient, updateStatusManager); - for (DataRefNode dataRefNode : dataRefNodes) { - BlockBTreeLeafNode leafNode = (BlockBTreeLeafNode) dataRefNode; - TableBlockInfo tableBlockInfo = leafNode.getTableBlockInfo(); - - if (CarbonUtil.isInvalidTableBlock(tableBlockInfo, updateStatusManager.getInvalidTimestampRange(tableBlockInfo.getSegmentId()), updateStatusManager)) { - continue; - } - result.add(new CarbonLocalInputSplit(segmentNo, tableBlockInfo.getFilePath(), - tableBlockInfo.getBlockOffset(), tableBlockInfo.getBlockLength(), - Arrays.asList(tableBlockInfo.getLocations()), tableBlockInfo.getBlockletInfos().getNoOfBlockLets(), - tableBlockInfo.getVersion().number())); - } - }catch (Exception ex){ - throw new RuntimeException(ex); - } - } - cacheClient.close(); - return result; - } - - /** - * get data blocks of given segment - */ - private List<DataRefNode> getDataBlocksOfSegment(FilterExpressionProcessor filterExpressionProcessor, - AbsoluteTableIdentifier absoluteTableIdentifier, - CarbonTablePath tablePath, - FilterResolverIntf resolver, - String segmentId, - CacheClient cacheClient, - SegmentUpdateStatusManager updateStatusManager) throws IOException { - //DriverQueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.getQueryStatisticsRecorderInstance(); - //QueryStatistic statistic = new QueryStatistic(); - - //读åSegment å é¨çIndex - Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> segmentIndexMap = - getSegmentAbstractIndexs(absoluteTableIdentifier, tablePath, segmentId, cacheClient, - updateStatusManager); - - List<DataRefNode> resultFilterredBlocks = new LinkedList<DataRefNode>(); - - if (null != segmentIndexMap) { - // build result - for (AbstractIndex abstractIndex : segmentIndexMap.values()) { - List<DataRefNode> filterredBlocks; - // if no filter is given get all blocks from Btree Index - if (null == resolver) { - filterredBlocks = getDataBlocksOfIndex(abstractIndex); - } else { - //ignore filter - //filterredBlocks = getDataBlocksOfIndex(abstractIndex); - - // apply filter and get matching blocks - filterredBlocks = filterExpressionProcessor - .getFilterredBlocks(abstractIndex.getDataRefNode(), resolver, abstractIndex, - absoluteTableIdentifier); - } - resultFilterredBlocks.addAll(filterredBlocks); - } - } - //statistic.addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_DRIVER, System.currentTimeMillis()); - //recorder.recordStatisticsForDriver(statistic, "123456"/*job.getConfiguration().get("query.id")*/); - return resultFilterredBlocks; - } - - private boolean isSegmentUpdate(SegmentTaskIndexWrapper segmentTaskIndexWrapper, - UpdateVO updateDetails) { - if (null != updateDetails.getLatestUpdateTimestamp() - && updateDetails.getLatestUpdateTimestamp() > segmentTaskIndexWrapper - .getRefreshedTimeStamp()) { - return true; - } - return false; - } - - private Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> getSegmentAbstractIndexs(/*JobContext job,*/ - AbsoluteTableIdentifier absoluteTableIdentifier, - CarbonTablePath tablePath, - String segmentId, - CacheClient cacheClient, - SegmentUpdateStatusManager updateStatusManager) throws IOException { - Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> segmentIndexMap = null; - SegmentTaskIndexWrapper segmentTaskIndexWrapper = null; - boolean isSegmentUpdated = false; - Set<SegmentTaskIndexStore.TaskBucketHolder> taskKeys = null; - TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier = - new TableSegmentUniqueIdentifier(absoluteTableIdentifier, segmentId); - segmentTaskIndexWrapper = - cacheClient.getSegmentAccessClient().getIfPresent(tableSegmentUniqueIdentifier); - UpdateVO updateDetails = updateStatusManager.getInvalidTimestampRange(segmentId); - if (null != segmentTaskIndexWrapper) { - segmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap(); - if (isSegmentUpdate(segmentTaskIndexWrapper, updateDetails)) { - taskKeys = segmentIndexMap.keySet(); - isSegmentUpdated = true; - } - } - - // if segment tree is not loaded, load the segment tree - if (segmentIndexMap == null || isSegmentUpdated) { - - List<FileStatus> fileStatusList = new LinkedList<FileStatus>(); - List<String> segs = new ArrayList<>(); - segs.add(segmentId); - - FileSystem fs = getFileStatusOfSegments(new String[]{segmentId}, tablePath, fileStatusList); - List<InputSplit> splits = getSplit(fileStatusList, fs); - - List<FileSplit> carbonSplits = new ArrayList<>(); - for (InputSplit inputSplit : splits) { - FileSplit fileSplit = (FileSplit) inputSplit; - String segId = CarbonTablePath.DataPathUtil.getSegmentId(fileSplit.getPath().toString());//è¿éçseperatoråºè¯¥æä¹å ï¼ï¼ - if (segId.equals(CarbonCommonConstants.INVALID_SEGMENT_ID)) { - continue; - } - carbonSplits.add(fileSplit); - } - - List<TableBlockInfo> tableBlockInfoList = new ArrayList<>(); - for (FileSplit inputSplit : carbonSplits) { - if (isValidBlockBasedOnUpdateDetails(taskKeys, inputSplit, updateDetails, updateStatusManager, segmentId)) { - - BlockletInfos blockletInfos = new BlockletInfos(0, 0, 0);//this level we do not need blocklet info!!!! Is this a trick? - tableBlockInfoList.add( - new TableBlockInfo(inputSplit.getPath().toString(), - inputSplit.getStart(), - segmentId, - inputSplit.getLocations(), - inputSplit.getLength(), - blockletInfos, - ColumnarFormatVersion.valueOf(CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION), - null/*new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE)*/));//è¿éçnullæ¯å¦ä¼å¼å¸¸ï¼ - } - } - - Map<String, List<TableBlockInfo>> segmentToTableBlocksInfos = new HashMap<>(); - segmentToTableBlocksInfos.put(segmentId, tableBlockInfoList); - // get Btree blocks for given segment - tableSegmentUniqueIdentifier.setSegmentToTableBlocksInfos(segmentToTableBlocksInfos); - tableSegmentUniqueIdentifier.setIsSegmentUpdated(isSegmentUpdated); - segmentTaskIndexWrapper = - cacheClient.getSegmentAccessClient().get(tableSegmentUniqueIdentifier); - segmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap(); - } - return segmentIndexMap; - } - - private boolean isValidBlockBasedOnUpdateDetails( - Set<SegmentTaskIndexStore.TaskBucketHolder> taskKeys, FileSplit carbonInputSplit, - UpdateVO updateDetails, SegmentUpdateStatusManager updateStatusManager, String segmentId) { - String taskID = null; - if (null != carbonInputSplit) { - if (!updateStatusManager.isBlockValid(segmentId, carbonInputSplit.getPath().getName())) { - return false; - } - - if (null == taskKeys) { - return true; - } - - taskID = CarbonTablePath.DataFileUtil.getTaskNo(carbonInputSplit.getPath().getName()); - String bucketNo = - CarbonTablePath.DataFileUtil.getBucketNo(carbonInputSplit.getPath().getName()); - - SegmentTaskIndexStore.TaskBucketHolder taskBucketHolder = - new SegmentTaskIndexStore.TaskBucketHolder(taskID, bucketNo); - - String blockTimestamp = carbonInputSplit.getPath().getName() - .substring(carbonInputSplit.getPath().getName().lastIndexOf('-') + 1, - carbonInputSplit.getPath().getName().lastIndexOf('.')); - if (!(updateDetails.getUpdateDeltaStartTimestamp() != null - && Long.parseLong(blockTimestamp) < updateDetails.getUpdateDeltaStartTimestamp())) { - if (!taskKeys.contains(taskBucketHolder)) { - return true; - } - } - } - return false; - } - - private List<InputSplit> getSplit(List<FileStatus> fileStatusList, FileSystem targetSystem) throws IOException { - - Iterator split = fileStatusList.iterator(); - - List<InputSplit> splits = new ArrayList<>(); - - while (true) - { - while (true) - { - while(split.hasNext()) { - FileStatus file = (FileStatus) split.next(); - Path path = file.getPath(); - long length = file.getLen(); - if (length != 0L) { - BlockLocation[] blkLocations; - if (file instanceof LocatedFileStatus) { - blkLocations = ((LocatedFileStatus) file).getBlockLocations(); - } else { - blkLocations = targetSystem.getFileBlockLocations(file, 0L, length); - } - - if (this.isSplitable()) { - long blockSize1 = file.getBlockSize(); - long splitSize = this.computeSplitSize(blockSize1, 1, Long.MAX_VALUE); - - long bytesRemaining; - int blkIndex; - for (bytesRemaining = length; (double) bytesRemaining / (double) splitSize > 1.1D; bytesRemaining -= splitSize) { - blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining); - splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, blkLocations[blkIndex].getHosts())); - } - - if (bytesRemaining != 0L) { - blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining); - splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts())); - } - } - else - { - splits.add(new org.apache.hadoop.mapreduce.lib.input.FileSplit(path, 0L, length, blkLocations[0].getHosts())); - } - } - else { - splits.add(new org.apache.hadoop.mapreduce.lib.input.FileSplit(path, 0L, length, new String[0])); - } - } - return splits; - } - } - - } - - private String[] getValidPartitions() { - //TODO: has to Identify partitions by partition pruning - return new String[] { "0" }; - } - - private FileSystem getFileStatusOfSegments(String[] segmentsToConsider, - CarbonTablePath tablePath, - List<FileStatus> result) throws IOException { - String[] partitionsToConsider = getValidPartitions(); - if (partitionsToConsider.length == 0) { - throw new IOException("No partitions/data found"); - } - - FileSystem fs = null; - - //PathFilter inputFilter = getDataFileFilter(job); - - // get tokens for all the required FileSystem for table path - /*TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { tablePath }, - job.getConfiguration());*/ - - //get all data files of valid partitions and segments - for (int i = 0; i < partitionsToConsider.length; ++i) { - String partition = partitionsToConsider[i]; - - for (int j = 0; j < segmentsToConsider.length; ++j) { - String segmentId = segmentsToConsider[j]; - Path segmentPath = new Path(tablePath.getCarbonDataDirectoryPath(partition, segmentId)); - - try{ - Configuration conf = new Configuration(); - fs = segmentPath.getFileSystem(conf); - //fs.initialize(segmentPath.toUri(), conf); - - RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(segmentPath); - while (iter.hasNext()) { - LocatedFileStatus stat = iter.next(); - //if(stat.getPath().toString().contains("carbondata"))//åçcarbondataçcarbonInputFilterçå®ç° - if (DefaultFilter.accept(stat.getPath())) - { - if (stat.isDirectory()) { - addInputPathRecursively(result, fs, stat.getPath(), DefaultFilter); - } else { - result.add(stat); - } - } - } - }catch (Exception ex){ - System.out.println(ex.toString()); - } - } - } - return fs; - } - - protected void addInputPathRecursively(List<FileStatus> result, FileSystem fs, Path path, PathFilter inputFilter) throws IOException { - RemoteIterator iter = fs.listLocatedStatus(path); - - while(iter.hasNext()) { - LocatedFileStatus stat = (LocatedFileStatus)iter.next(); - if(inputFilter.accept(stat.getPath())) { - if(stat.isDirectory()) { - this.addInputPathRecursively(result, fs, stat.getPath(), inputFilter); - } else { - result.add(stat); - } - } - } - - } - - /** - * get data blocks of given btree - */ - private List<DataRefNode> getDataBlocksOfIndex(AbstractIndex abstractIndex) { - List<DataRefNode> blocks = new LinkedList<DataRefNode>(); - SegmentProperties segmentProperties = abstractIndex.getSegmentProperties(); - - try { - IndexKey startIndexKey = FilterUtil.prepareDefaultStartIndexKey(segmentProperties); - IndexKey endIndexKey = FilterUtil.prepareDefaultEndIndexKey(segmentProperties); - - // Add all blocks of btree into result - DataRefNodeFinder blockFinder = - new BTreeDataRefNodeFinder(segmentProperties.getEachDimColumnValueSize()); - DataRefNode startBlock = - blockFinder.findFirstDataBlock(abstractIndex.getDataRefNode(), startIndexKey); - DataRefNode endBlock = - blockFinder.findLastDataBlock(abstractIndex.getDataRefNode(), endIndexKey); - while (startBlock != endBlock) { - blocks.add(startBlock); - startBlock = startBlock.getNextDataRefNode(); - } - blocks.add(endBlock); - - } catch (KeyGenException e) { - System.out.println("Could not generate start key" + e.getMessage()); - } - return blocks; - } - - private boolean isSplitable() { - try { - // Don't split the file if it is local file system - if(this.fileType == FileFactory.FileType.LOCAL) - { - return false; - } - } catch (Exception e) { - return true; - } - return true; - } - - private long computeSplitSize(long blockSize, long minSize, - long maxSize) { - return Math.max(minSize, Math.min(maxSize, blockSize)); - } - - private FileSplit makeSplit(Path file, long start, long length, - String[] hosts) { - return new FileSplit(file, start, length, hosts); - } - - private int getBlockIndex(BlockLocation[] blkLocations, - long offset) { - for (int i = 0 ; i < blkLocations.length; i++) { - // is the offset inside this block? - if ((blkLocations[i].getOffset() <= offset) && - (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){ - return i; - } - } - BlockLocation last = blkLocations[blkLocations.length -1]; - long fileLength = last.getOffset() + last.getLength() -1; - throw new IllegalArgumentException("Offset " + offset + - " is outside of file (0.." + - fileLength + ")"); - } - - - /** - * get total number of rows. for count(*) - * - * @throws IOException - * @throws IndexBuilderException - */ - public long getRowCount() throws IOException, IndexBuilderException { - long rowCount = 0; - /*AbsoluteTableIdentifier absoluteTableIdentifier = this.carbonTable.getAbsoluteTableIdentifier(); - - // no of core to load the blocks in driver - //addSegmentsIfEmpty(job, absoluteTableIdentifier); - int numberOfCores = CarbonCommonConstants.NUMBER_OF_CORE_TO_LOAD_DRIVER_SEGMENT_DEFAULT_VALUE; - try { - numberOfCores = Integer.parseInt(CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.NUMBER_OF_CORE_TO_LOAD_DRIVER_SEGMENT)); - } catch (NumberFormatException e) { - numberOfCores = CarbonCommonConstants.NUMBER_OF_CORE_TO_LOAD_DRIVER_SEGMENT_DEFAULT_VALUE; - } - // creating a thread pool - ExecutorService threadPool = Executors.newFixedThreadPool(numberOfCores); - List<Future<Map<String, AbstractIndex>>> loadedBlocks = - new ArrayList<Future<Map<String, AbstractIndex>>>(); - //for each segment fetch blocks matching filter in Driver BTree - for (String segmentNo : this.segmentList) { - // submitting the task - loadedBlocks - .add(threadPool.submit(new BlocksLoaderThread(*//*job,*//* absoluteTableIdentifier, segmentNo))); - } - threadPool.shutdown(); - try { - threadPool.awaitTermination(1, TimeUnit.HOURS); - } catch (InterruptedException e) { - throw new IndexBuilderException(e); - } - try { - // adding all the rows of the blocks to get the total row - // count - for (Future<Map<String, AbstractIndex>> block : loadedBlocks) { - for (AbstractIndex abstractIndex : block.get().values()) { - rowCount += abstractIndex.getTotalNumberOfRows(); - } - } - } catch (InterruptedException | ExecutionException e) { - throw new IndexBuilderException(e); - }*/ - return rowCount; - } -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/32bf2961/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnConstraint.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnConstraint.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnConstraint.java new file mode 100755 index 0000000..f2f69d9 --- /dev/null +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnConstraint.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto; + +import com.facebook.presto.spi.predicate.Domain; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonSetter; + +import java.util.Objects; +import java.util.Optional; + +import static com.google.common.base.MoreObjects.toStringHelper; +import static java.util.Objects.requireNonNull; + +public class CarbondataColumnConstraint { + private final String name; + private final boolean invertedindexed; + private Optional<Domain> domain; + + @JsonCreator + public CarbondataColumnConstraint( + @JsonProperty("name") String name, + @JsonProperty("domain") Optional<Domain> domain, + @JsonProperty("invertedindexed") boolean invertedindexed) + { + this.name = requireNonNull(name, "name is null"); + this.invertedindexed = requireNonNull(invertedindexed, "invertedIndexed is null"); + this.domain = requireNonNull(domain, "domain is null"); + } + + @JsonProperty + public boolean isInvertedindexed() + { + return invertedindexed; + } + + @JsonProperty + public String getName() + { + return name; + } + + @JsonProperty + public Optional<Domain> getDomain() + { + return domain; + } + + @JsonSetter + public void setDomain(Optional<Domain> domain) + { + this.domain = domain; + } + + @Override + public int hashCode() + { + return Objects.hash(name, domain, invertedindexed); + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) { + return true; + } + + if ((obj == null) || (getClass() != obj.getClass())) { + return false; + } + + CarbondataColumnConstraint other = (CarbondataColumnConstraint) obj; + return Objects.equals(this.name, other.name) + && Objects.equals(this.domain, other.domain) + && Objects.equals(this.invertedindexed, other.invertedindexed); + } + + @Override + public String toString() + { + return toStringHelper(this) + .add("name", this.name) + .add("invertedindexed", this.invertedindexed) + .add("domain", this.domain) + .toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/32bf2961/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java new file mode 100755 index 0000000..252556a --- /dev/null +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto; + +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ColumnMetadata; +import com.facebook.presto.spi.type.Type; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +import static com.google.common.base.MoreObjects.toStringHelper; +import static java.util.Objects.requireNonNull; + +public class CarbondataColumnHandle + implements ColumnHandle +{ + private final String connectorId; + private final String columnName; + + public boolean isInvertedIndex() { + return isInvertedIndex; + } + + private final Type columnType; + private final int ordinalPosition; + private final int keyOrdinal; + private final int columnGroupOrdinal; + + private final int columnGroupId; + private final String columnUniqueId; + private final boolean isInvertedIndex; + + public boolean isMeasure() { + return isMeasure; + } + + private final boolean isMeasure; + + public int getKeyOrdinal() { + return keyOrdinal; + } + + public int getColumnGroupOrdinal() { + return columnGroupOrdinal; + } + + public int getColumnGroupId() { + return columnGroupId; + } + + public String getColumnUniqueId() { + return columnUniqueId; + } + /* ordinalPosition of a columnhandle is the -> number of the column in the entire list of columns of this table + IT DOESNT DEPEND ON THE QUERY (select clm3, clm0, clm1 from tablename) + The columnhandle of clm3 : has ordinalposition = 3 + */ + + @JsonCreator + public CarbondataColumnHandle( + @JsonProperty("connectorId") String connectorId, + @JsonProperty("columnName") String columnName, + @JsonProperty("columnType") Type columnType, + @JsonProperty("ordinalPosition") int ordinalPosition, + @JsonProperty("keyOrdinal") int keyOrdinal, + @JsonProperty("columnGroupOrdinal") int columnGroupOrdinal, + @JsonProperty("isMeasure") boolean isMeasure, + @JsonProperty("columnGroupId") int columnGroupId, + @JsonProperty("columnUniqueId") String columnUniqueId, + @JsonProperty("isInvertedIndex") boolean isInvertedIndex) + { + this.connectorId = requireNonNull(connectorId, "connectorId is null"); + this.columnName = requireNonNull(columnName, "columnName is null"); + this.columnType = requireNonNull(columnType, "columnType is null"); + + this.ordinalPosition = requireNonNull(ordinalPosition, "ordinalPosition is null"); + this.keyOrdinal = requireNonNull(keyOrdinal, "keyOrdinal is null"); + this.columnGroupOrdinal = requireNonNull(columnGroupOrdinal, "columnGroupOrdinal is null"); + + this.isMeasure = isMeasure; + this.columnGroupId = requireNonNull(columnGroupId, "columnGroupId is null"); + this.columnUniqueId = columnUniqueId;//requireNonNull(columnUniqueId, "columnUniqueId is null"); + this.isInvertedIndex = requireNonNull(isInvertedIndex, "isInvertedIndex is null"); + } + + @JsonProperty + public String getConnectorId() + { + return connectorId; + } + + @JsonProperty + public String getColumnName() + { + return columnName; + } + + @JsonProperty + public Type getColumnType() + { + return columnType; + } + + @JsonProperty + public int getOrdinalPosition() + { + return ordinalPosition; + } + + public ColumnMetadata getColumnMetadata() + { + return new ColumnMetadata(columnName, columnType, null, false); + } + + @Override + public int hashCode() + { + return Objects.hash(connectorId, columnName); + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) { + return true; + } + if ((obj == null) || (getClass() != obj.getClass())) { + return false; + } + + CarbondataColumnHandle other = (CarbondataColumnHandle) obj; + return Objects.equals(this.connectorId, other.connectorId) && + Objects.equals(this.columnName, other.columnName); + } + + @Override + public String toString() + { + return toStringHelper(this) + .add("connectorId", connectorId) + .add("columnName", columnName) + .add("columnType", columnType) + .add("ordinalPosition", ordinalPosition) + .toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/32bf2961/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnector.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnector.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnector.java new file mode 100755 index 0000000..90b4944 --- /dev/null +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnector.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto; + +import com.facebook.presto.spi.connector.*; +import com.facebook.presto.spi.transaction.IsolationLevel; +import io.airlift.bootstrap.LifeCycleManager; +import io.airlift.log.Logger; + +import static com.facebook.presto.spi.transaction.IsolationLevel.READ_COMMITTED; +import static com.facebook.presto.spi.transaction.IsolationLevel.checkConnectorSupports; +import static java.util.Objects.requireNonNull; + +public class CarbondataConnector + implements Connector +{ + + private static final Logger log = Logger.get(CarbondataConnector.class); + + private final LifeCycleManager lifeCycleManager; + private final CarbondataMetadata metadata; + private final ConnectorSplitManager splitManager; + private final ConnectorRecordSetProvider recordSetProvider; + private final ClassLoader classLoader; + + + public CarbondataConnector(LifeCycleManager lifeCycleManager, + CarbondataMetadata metadata, + ConnectorSplitManager splitManager, + ConnectorRecordSetProvider recordSetProvider, + ClassLoader classLoader) + { + this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null"); + this.metadata = requireNonNull(metadata, "metadata is null"); + this.splitManager = requireNonNull(splitManager, "splitManager is null"); + this.recordSetProvider = requireNonNull(recordSetProvider, "recordSetProvider is null"); + this.classLoader = requireNonNull(classLoader, "classLoader is null"); + } + + @Override + public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel, boolean readOnly) { + checkConnectorSupports(READ_COMMITTED, isolationLevel); + return CarbondataTransactionHandle.INSTANCE; + } + + @Override + public ConnectorMetadata getMetadata(ConnectorTransactionHandle transactionHandle) { + metadata.putClassLoader(classLoader); + return metadata; + } + + @Override + public ConnectorSplitManager getSplitManager() { + return splitManager; + } + + @Override + public ConnectorRecordSetProvider getRecordSetProvider() + { + return recordSetProvider; + } + + @Override + public final void shutdown() + { + try { + lifeCycleManager.stop(); + } + catch (Exception e) { + log.error(e, "Error shutting down connector"); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/32bf2961/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java new file mode 100755 index 0000000..324699c --- /dev/null +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto; + +import com.facebook.presto.spi.ConnectorHandleResolver; +import com.facebook.presto.spi.classloader.ThreadContextClassLoader; +import com.facebook.presto.spi.connector.*; +import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorSplitManager; +import com.google.common.base.Throwables; +import com.google.inject.Injector; +import io.airlift.bootstrap.Bootstrap; +import io.airlift.bootstrap.LifeCycleManager; +import io.airlift.json.JsonModule; + +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +public class CarbondataConnectorFactory + implements ConnectorFactory { + + private final String name; + private final ClassLoader classLoader; + + public CarbondataConnectorFactory(String connectorName, ClassLoader classLoader){ + this.name = connectorName; + this.classLoader = requireNonNull(classLoader, "classLoader is null"); + } + + + @Override + public String getName() { + return name; + } + + @Override + public ConnectorHandleResolver getHandleResolver() { + return new CarbondataHandleResolver(); + } + + @Override + public Connector create(String connectorId, Map<String, String> config, ConnectorContext context) { + requireNonNull(config, "config is null"); + + try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { + Bootstrap app = new Bootstrap(new JsonModule(), new CarbondataModule(connectorId, context.getTypeManager())); + + Injector injector = app + .strictConfig() + .doNotInitializeLogging() + .setRequiredConfigurationProperties(config) + .initialize(); + + LifeCycleManager lifeCycleManager = injector.getInstance(LifeCycleManager.class); + CarbondataMetadata metadata = injector.getInstance(CarbondataMetadata.class); + //HiveTransactionManager transactionManager = injector.getInstance(HiveTransactionManager.class); + ConnectorSplitManager splitManager = injector.getInstance(ConnectorSplitManager.class); + ConnectorRecordSetProvider connectorRecordSet = injector.getInstance(ConnectorRecordSetProvider.class); + //ConnectorAccessControl accessControl = injector.getInstance(ConnectorAccessControl.class); + + + //ConnectorPageSourceProvider connectorPageSource = injector.getInstance(ConnectorPageSourceProvider.class); + //ConnectorPageSinkProvider pageSinkProvider = injector.getInstance(ConnectorPageSinkProvider.class); + //ConnectorNodePartitioningProvider connectorDistributionProvider = injector.getInstance(ConnectorNodePartitioningProvider.class); + //HiveSessionProperties hiveSessionProperties = injector.getInstance(HiveSessionProperties.class); + //HiveTableProperties hiveTableProperties = injector.getInstance(HiveTableProperties.class); + + + return new CarbondataConnector( + lifeCycleManager, + metadata, + new ClassLoaderSafeConnectorSplitManager(splitManager, classLoader), + connectorRecordSet,//new ClassLoaderSafeConnectorRecordSetProvider(, classLoader), + classLoader + //new ClassLoaderSafeConnectorPageSourceProvider(connectorPageSource, classLoader), + //new ClassLoaderSafeConnectorPageSinkProvider(pageSinkProvider, classLoader), + //new ClassLoaderSafeNodePartitioningProvider(connectorDistributionProvider, classLoader), + ); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/32bf2961/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorId.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorId.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorId.java new file mode 100755 index 0000000..5aa72f1 --- /dev/null +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorId.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.presto; + +import com.google.inject.Inject; + +import java.util.Objects; + +import static java.util.Objects.requireNonNull; + +public class CarbondataConnectorId +{ + private final String id; + + @Inject + public CarbondataConnectorId(String id) + { + this.id = requireNonNull(id, "id is null"); + } + + @Override + public String toString() + { + return id; + } + + @Override + public int hashCode() + { + return Objects.hash(id); + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) { + return true; + } + + if ((obj == null) || (getClass() != obj.getClass())) { + return false; + } + + return Objects.equals(this.id, ((CarbondataConnectorId) obj).id); + } +} \ No newline at end of file