Repository: incubator-carbondata Updated Branches: refs/heads/presto 3bdda74fe -> ca0ecbd50
refactor integration/presto Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/d3bbc709 Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/d3bbc709 Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/d3bbc709 Branch: refs/heads/presto Commit: d3bbc70969343104b43739fd87095eb30983e1c9 Parents: 3bdda74 Author: chenliang613 <chenliang...@huawei.com> Authored: Sat Apr 8 15:07:47 2017 +0530 Committer: chenliang613 <chenliang...@huawei.com> Committed: Sat Apr 8 15:07:47 2017 +0530 ---------------------------------------------------------------------- .../presto/CarbondataConnectorFactory.java | 12 ------ .../carbondata/presto/CarbondataMetadata.java | 34 ++++++++--------- .../presto/CarbondataRecordSetProvider.java | 12 +++--- .../presto/impl/CarbonTableReader.java | 40 +++++++++----------- 4 files changed, 37 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d3bbc709/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java index 66c007d..d1c8082 100755 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java @@ -67,25 +67,13 @@ public class CarbondataConnectorFactory implements ConnectorFactory { LifeCycleManager lifeCycleManager = injector.getInstance(LifeCycleManager.class); CarbondataMetadata metadata = injector.getInstance(CarbondataMetadata.class); - //HiveTransactionManager transactionManager = injector.getInstance(HiveTransactionManager.class); ConnectorSplitManager splitManager = injector.getInstance(ConnectorSplitManager.class); ConnectorRecordSetProvider connectorRecordSet = injector.getInstance(ConnectorRecordSetProvider.class); - //ConnectorAccessControl accessControl = injector.getInstance(ConnectorAccessControl.class); - - //ConnectorPageSourceProvider connectorPageSource = injector.getInstance(ConnectorPageSourceProvider.class); - //ConnectorPageSinkProvider pageSinkProvider = injector.getInstance(ConnectorPageSinkProvider.class); - //ConnectorNodePartitioningProvider connectorDistributionProvider = injector.getInstance(ConnectorNodePartitioningProvider.class); - //HiveSessionProperties hiveSessionProperties = injector.getInstance(HiveSessionProperties.class); - //HiveTableProperties hiveTableProperties = injector.getInstance(HiveTableProperties.class); return new CarbondataConnector(lifeCycleManager, metadata, new ClassLoaderSafeConnectorSplitManager(splitManager, classLoader), connectorRecordSet, - //new ClassLoaderSafeConnectorRecordSetProvider(, classLoader), classLoader - //new ClassLoaderSafeConnectorPageSourceProvider(connectorPageSource, classLoader), - //new ClassLoaderSafeConnectorPageSinkProvider(pageSinkProvider, classLoader), - //new ClassLoaderSafeNodePartitioningProvider(connectorDistributionProvider, classLoader), ); } catch (Exception e) { throw Throwables.propagate(e); http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d3bbc709/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java index d938a3d..f2d594a 100755 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java @@ -59,11 +59,11 @@ public class CarbondataMetadata implements ConnectorMetadata { } public List<String> listSchemaNamesInternal() { - List<String> ret; + List<String> schemaNameList; try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) { - ret = carbonTableReader.getSchemaNames(); + schemaNameList = carbonTableReader.getSchemaNames(); } - return ret; + return schemaNameList; } @Override @@ -109,27 +109,28 @@ public class CarbondataMetadata implements ConnectorMetadata { return ImmutableList.of(new SchemaTableName(prefix.getSchemaName(), prefix.getTableName())); } - private ConnectorTableMetadata getTableMetadata(SchemaTableName tableName) { - if (!listSchemaNamesInternal().contains(tableName.getSchemaName())) { + private ConnectorTableMetadata getTableMetadata(SchemaTableName schemaTableName) { + if (!listSchemaNamesInternal().contains(schemaTableName.getSchemaName())) { return null; } - CarbonTable cb = carbonTableReader.getTable(tableName); - if (cb == null) { + CarbonTable carbonTable = carbonTableReader.getTable(schemaTableName); + if (carbonTable == null) { return null; } - List<ColumnMetadata> spiCols = new LinkedList<>(); - List<CarbonColumn> carbonColumns = cb.getCreateOrderColumn(tableName.getTableName()); + List<ColumnMetadata> columnsMetaList = new LinkedList<>(); + List<CarbonColumn> carbonColumns = carbonTable.getCreateOrderColumn(schemaTableName.getTableName()); for (CarbonColumn col : carbonColumns) { //show columns command will return these data - Type spiType = CarbondataType2SpiMapper(col.getColumnSchema().getDataType()); - ColumnMetadata spiCol = new ColumnMetadata(col.getColumnSchema().getColumnName(), spiType); - spiCols.add(spiCol); + Type columnType = CarbondataType2SpiMapper(col.getColumnSchema().getDataType()); + ColumnMetadata columnMeta = new ColumnMetadata(col.getColumnSchema().getColumnName(), + columnType); + columnsMetaList.add(columnMeta); } //carbondata connector's table metadata - return new ConnectorTableMetadata(tableName, spiCols); + return new ConnectorTableMetadata(schemaTableName, columnsMetaList); } @Override public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, @@ -141,6 +142,7 @@ public class CarbondataMetadata implements ConnectorMetadata { "tableHandle is not for this connector"); String schemaName = handle.getSchemaTableName().getSchemaName(); + if (!listSchemaNamesInternal().contains(schemaName)) { throw new SchemaNotFoundException(schemaName); } @@ -250,12 +252,6 @@ public class CarbondataMetadata implements ConnectorMetadata { return DateType.DATE; case TIMESTAMP: return TimestampType.TIMESTAMP; - - /*case DataType.MAP: - case DataType.ARRAY: - case DataType.STRUCT: - case DataType.NULL:*/ - default: return VarcharType.VARCHAR; } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d3bbc709/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java index 8b087df..f0958c7 100755 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java @@ -61,8 +61,6 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider { @Inject public CarbondataRecordSetProvider(CarbondataConnectorId connectorId, CarbonTableReader reader) { - //this.config = requireNonNull(config, "config is null"); - //this.connector = requireNonNull(connector, "connector is null"); this.connectorId = requireNonNull(connectorId, "connectorId is null").toString(); this.carbonTableReader = reader; } @@ -72,9 +70,9 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider { requireNonNull(split, "split is null"); requireNonNull(columns, "columns is null"); - CarbondataSplit cdSplit = + CarbondataSplit carbondataSplit = checkType(split, CarbondataSplit.class, "split is not class CarbondataSplit"); - checkArgument(cdSplit.getConnectorId().equals(connectorId), "split is not for this connector"); + checkArgument(carbondataSplit.getConnectorId().equals(connectorId), "split is not for this connector"); String targetCols = ""; // Convert all columns handles @@ -95,7 +93,7 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider { //String cols = String.join(",", columns.stream().map(a -> ((CarbondataColumnHandle)a).getColumnName()).collect(Collectors.toList())); CarbonTableCacheModel tableCacheModel = - carbonTableReader.getCarbonCache(cdSplit.getSchemaTableName()); + carbonTableReader.getCarbonCache(carbondataSplit.getSchemaTableName()); checkNotNull(tableCacheModel, "tableCacheModel should not be null"); checkNotNull(tableCacheModel.carbonTable, "tableCacheModel.carbonTable should not be null"); checkNotNull(tableCacheModel.tableInfo, "tableCacheModel.tableInfo should not be null"); @@ -107,10 +105,10 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider { QueryModel.createModel(targetTable.getAbsoluteTableIdentifier(), queryPlan, targetTable); // Push down filter - fillFilter2QueryModel(queryModel, cdSplit.getConstraints(), targetTable); + fillFilter2QueryModel(queryModel, carbondataSplit.getConstraints(), targetTable); // Return new record set - return new CarbondataRecordSet(targetTable, session, cdSplit, + return new CarbondataRecordSet(targetTable, session, carbondataSplit, handles.build(), queryModel); } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/d3bbc709/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java ---------------------------------------------------------------------- diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java index bb482b0..b6e45d6 100755 --- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java +++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java @@ -52,18 +52,13 @@ import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.hadoop.CacheClient; import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil; -import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.thrift.TBase; -import java.io.BufferedReader; import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.net.URI; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; @@ -82,7 +77,7 @@ public class CarbonTableReader { private CarbonTableConfig config; private List<SchemaTableName> tableList; - private CarbonFile dbStore; + private CarbonFile carbonFileList; private FileFactory.FileType fileType; // A cache for Carbon reader @@ -98,10 +93,10 @@ public class CarbonTableReader { if (!cc.containsKey(table)) { try (ThreadContextClassLoader ignored = new ThreadContextClassLoader( FileFactory.class.getClassLoader())) { - if (dbStore == null) { + if (carbonFileList == null) { fileType = FileFactory.getFileType(config.getStorePath()); try { - dbStore = FileFactory.getCarbonFile(config.getStorePath(), fileType); + carbonFileList = FileFactory.getCarbonFile(config.getStorePath(), fileType); } catch (Exception ex) { throw new RuntimeException(ex); } @@ -126,11 +121,11 @@ public class CarbonTableReader { } }; - public boolean updateDbStore() { - if (dbStore == null) { + public boolean updateCarbonFile() { + if (carbonFileList == null) { fileType = FileFactory.getFileType(config.getStorePath()); try { - dbStore = FileFactory.getCarbonFile(config.getStorePath(), fileType); + carbonFileList = FileFactory.getCarbonFile(config.getStorePath(), fileType); } catch (Exception ex) { throw new RuntimeException(ex); } @@ -139,12 +134,12 @@ public class CarbonTableReader { } public List<String> updateSchemaList() { - updateDbStore(); + updateCarbonFile(); - if (dbStore != null) { - List<String> scs = - Stream.of(dbStore.listFiles()).map(a -> a.getName()).collect(Collectors.toList()); - return scs; + if (carbonFileList != null) { + List<String> schemaList = + Stream.of(carbonFileList.listFiles()).map(a -> a.getName()).collect(Collectors.toList()); + return schemaList; } else return ImmutableList.of(); } @@ -154,7 +149,7 @@ public class CarbonTableReader { } public Set<String> updateTableList(String dbName) { - List<CarbonFile> schema = Stream.of(dbStore.listFiles()).filter(a -> dbName.equals(a.getName())) + List<CarbonFile> schema = Stream.of(carbonFileList.listFiles()).filter(a -> dbName.equals(a.getName())) .collect(Collectors.toList()); if (schema.size() > 0) { return Stream.of((schema.get(0)).listFiles()).map(a -> a.getName()) @@ -177,15 +172,14 @@ public class CarbonTableReader { public void updateSchemaTables() { // update logic determine later - if (dbStore == null) { + if (carbonFileList == null) { updateSchemaList(); } - tableList = new LinkedList<>(); - for (CarbonFile db : dbStore.listFiles()) { - if (!db.getName().endsWith(".mdt")) { - for (CarbonFile table : db.listFiles()) { - tableList.add(new SchemaTableName(db.getName(), table.getName())); + for (CarbonFile cf : carbonFileList.listFiles()) { + if (!cf.getName().endsWith(".mdt")) { + for (CarbonFile table : cf.listFiles()) { + tableList.add(new SchemaTableName(cf.getName(), table.getName())); } } }