Refactoring code for better organization. + Adding skeleton streams plugin.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/c74d75ce Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/c74d75ce Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/c74d75ce Branch: refs/heads/master Commit: c74d75ce4aebc2fe188793d77dc5bb22542c9c4a Parents: 58e95ca Author: Aditya <adi...@mapr.com> Authored: Fri Mar 4 15:17:36 2016 -0800 Committer: Aditya Kishore <a...@apache.org> Committed: Fri Sep 9 10:08:38 2016 -0700 ---------------------------------------------------------------------- .../exec/store/mapr/TableFormatMatcher.java | 76 +++ .../exec/store/mapr/TableFormatPlugin.java | 138 +++++ .../store/mapr/TableFormatPluginConfig.java | 38 ++ .../exec/store/mapr/db/MapRDBFormatMatcher.java | 42 ++ .../exec/store/mapr/db/MapRDBFormatPlugin.java | 82 +++ .../store/mapr/db/MapRDBFormatPluginConfig.java | 68 +++ .../exec/store/mapr/db/MapRDBGroupScan.java | 283 +++++++++ .../store/mapr/db/MapRDBPushFilterIntoScan.java | 205 +++++++ .../store/mapr/db/MapRDBScanBatchCreator.java | 65 +++ .../drill/exec/store/mapr/db/MapRDBSubScan.java | 125 ++++ .../exec/store/mapr/db/MapRDBSubScanSpec.java | 114 ++++ .../exec/store/mapr/db/MapRDBTableStats.java | 46 ++ .../exec/store/mapr/db/TabletFragmentInfo.java | 108 ++++ .../mapr/db/binary/BinaryTableGroupScan.java | 214 +++++++ .../db/binary/CompareFunctionsProcessor.java | 547 ++++++++++++++++++ .../mapr/db/binary/MapRDBFilterBuilder.java | 355 ++++++++++++ .../mapr/db/json/CompareFunctionsProcessor.java | 222 ++++++++ .../mapr/db/json/JsonConditionBuilder.java | 240 ++++++++ .../exec/store/mapr/db/json/JsonScanSpec.java | 109 ++++ .../store/mapr/db/json/JsonSubScanSpec.java | 112 ++++ .../store/mapr/db/json/JsonTableGroupScan.java | 183 ++++++ .../mapr/db/json/MaprDBJsonRecordReader.java | 569 +++++++++++++++++++ .../exec/store/mapr/db/util/CommonFns.java | 26 + .../mapr/streams/StreamsFormatMatcher.java | 42 ++ .../store/mapr/streams/StreamsFormatPlugin.java | 79 +++ .../mapr/streams/StreamsFormatPluginConfig.java | 39 ++ .../exec/store/maprdb/MapRDBFormatMatcher.java | 68 --- .../exec/store/maprdb/MapRDBFormatPlugin.java | 173 ------ .../store/maprdb/MapRDBFormatPluginConfig.java | 82 --- .../exec/store/maprdb/MapRDBGroupScan.java | 283 --------- .../store/maprdb/MapRDBPushFilterIntoScan.java | 205 ------- .../store/maprdb/MapRDBScanBatchCreator.java | 65 --- .../drill/exec/store/maprdb/MapRDBSubScan.java | 124 ---- .../exec/store/maprdb/MapRDBSubScanSpec.java | 114 ---- .../exec/store/maprdb/MapRDBTableStats.java | 46 -- .../exec/store/maprdb/TabletFragmentInfo.java | 108 ---- .../maprdb/binary/BinaryTableGroupScan.java | 216 ------- .../binary/CompareFunctionsProcessor.java | 547 ------------------ .../maprdb/binary/MapRDBFilterBuilder.java | 355 ------------ .../maprdb/json/CompareFunctionsProcessor.java | 222 -------- .../store/maprdb/json/JsonConditionBuilder.java | 240 -------- .../exec/store/maprdb/json/JsonScanSpec.java | 109 ---- .../exec/store/maprdb/json/JsonSubScanSpec.java | 112 ---- .../store/maprdb/json/JsonTableGroupScan.java | 184 ------ .../maprdb/json/MaprDBJsonRecordReader.java | 569 ------------------- .../drill/exec/store/maprdb/util/CommonFns.java | 26 - .../drill/maprdb/tests/MaprDBTestsSuite.java | 9 +- .../maprdb/tests/binary/TestMapRDBSimple.java | 2 +- 48 files changed, 4135 insertions(+), 3851 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/TableFormatMatcher.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/TableFormatMatcher.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/TableFormatMatcher.java new file mode 100644 index 0000000..192e57d --- /dev/null +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/TableFormatMatcher.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.mapr; + +import java.io.IOException; + +import org.apache.drill.exec.planner.logical.DrillTable; +import org.apache.drill.exec.planner.logical.DynamicDrillTable; +import org.apache.drill.exec.store.dfs.DrillFileSystem; +import org.apache.drill.exec.store.dfs.FileSelection; +import org.apache.drill.exec.store.dfs.FileSystemPlugin; +import org.apache.drill.exec.store.dfs.FormatMatcher; +import org.apache.drill.exec.store.dfs.FormatSelection; +import org.apache.hadoop.fs.FileStatus; + +import com.mapr.fs.MapRFileStatus; + +public abstract class TableFormatMatcher extends FormatMatcher { + + private final TableFormatPlugin plugin; + + public TableFormatMatcher(TableFormatPlugin plugin) { + this.plugin = plugin; + } + + @Override + public boolean supportDirectoryReads() { + return false; + } + + public DrillTable isReadable(DrillFileSystem fs, + FileSelection selection, FileSystemPlugin fsPlugin, + String storageEngineName, String userName) throws IOException { + FileStatus status = selection.getFirstPath(fs); + if (!isFileReadable(fs, status)) { + return null; + } + + return new DynamicDrillTable(fsPlugin, storageEngineName, userName, + new FormatSelection(getFormatPlugin().getConfig(), selection)); + } + + @Override + public boolean isFileReadable(DrillFileSystem fs, FileStatus status) throws IOException { + return (status instanceof MapRFileStatus) + && ((MapRFileStatus) status).isTable() + && isSupportedTable((MapRFileStatus) status); + } + + @Override + public TableFormatPlugin getFormatPlugin() { + return plugin; + } + + /** + * Returns true if the path pointed by the MapRFileStatus is a supported table + * by this format plugin. The path must point to a MapR table. + */ + protected abstract boolean isSupportedTable(MapRFileStatus status) throws IOException; + +} http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/TableFormatPlugin.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/TableFormatPlugin.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/TableFormatPlugin.java new file mode 100644 index 0000000..b0131fd --- /dev/null +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/TableFormatPlugin.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.mapr; + +import static com.mapr.fs.jni.MapRConstants.MAPRFS_PREFIX; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.List; +import java.util.Set; + +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.logical.FormatPluginConfig; +import org.apache.drill.common.logical.StoragePluginConfig; +import org.apache.drill.exec.physical.base.AbstractWriter; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.store.StoragePluginOptimizerRule; +import org.apache.drill.exec.store.dfs.FileSystemConfig; +import org.apache.drill.exec.store.dfs.FileSystemPlugin; +import org.apache.drill.exec.store.dfs.FormatPlugin; +import org.apache.hadoop.conf.Configuration; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.google.common.collect.ImmutableSet; +import com.mapr.fs.MapRFileSystem; + +public abstract class TableFormatPlugin implements FormatPlugin { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory + .getLogger(TableFormatPlugin.class); + + private final FileSystemConfig storageConfig; + private final TableFormatPluginConfig config; + private final Configuration fsConf; + private final DrillbitContext context; + private final String name; + + private volatile FileSystemPlugin storagePlugin; + private final MapRFileSystem maprfs; + + protected TableFormatPlugin(String name, DrillbitContext context, Configuration fsConf, + StoragePluginConfig storageConfig, TableFormatPluginConfig formatConfig) { + this.context = context; + this.config = formatConfig; + this.storageConfig = (FileSystemConfig) storageConfig; + this.fsConf = fsConf; + this.name = name == null ? "maprdb" : name; + try { + this.maprfs = new MapRFileSystem(); + getMaprFS().initialize(new URI(MAPRFS_PREFIX), fsConf); + } catch (IOException | URISyntaxException e) { + throw new RuntimeException(e); + } + } + + @Override + public boolean supportsRead() { + return true; + } + + @Override + public boolean supportsWrite() { + return false; + } + + @Override + public boolean supportsAutoPartitioning() { + return false; + } + + public Configuration getFsConf() { + return fsConf; + } + + @Override + public Set<StoragePluginOptimizerRule> getOptimizerRules() { + return ImmutableSet.of(); + } + + @Override + public AbstractWriter getWriter(PhysicalOperator child, String location, + List<String> partitionColumns) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public FormatPluginConfig getConfig() { + return config; + } + + @Override + public StoragePluginConfig getStorageConfig() { + return storageConfig; + } + + @Override + public DrillbitContext getContext() { + return context; + } + + @Override + public String getName() { + return name; + } + + public synchronized FileSystemPlugin getStoragePlugin() { + if (this.storagePlugin == null) { + try { + this.storagePlugin = (FileSystemPlugin) (context.getStorage().getPlugin(storageConfig)); + } catch (ExecutionSetupException e) { + throw new RuntimeException(e); + } + } + return storagePlugin; + } + + @JsonIgnore + public MapRFileSystem getMaprFS() { + return maprfs; + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/TableFormatPluginConfig.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/TableFormatPluginConfig.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/TableFormatPluginConfig.java new file mode 100644 index 0000000..904cdb9 --- /dev/null +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/TableFormatPluginConfig.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.mapr; + +import org.apache.drill.common.logical.FormatPluginConfig; + +public abstract class TableFormatPluginConfig implements FormatPluginConfig { + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } else if (obj == null) { + return false; + } else if (getClass() != obj.getClass()) { + return false; + } + return impEquals(obj); + } + + protected abstract boolean impEquals(Object obj); + +} http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatMatcher.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatMatcher.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatMatcher.java new file mode 100644 index 0000000..4a5d118 --- /dev/null +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatMatcher.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.mapr.db; + +import java.io.IOException; + +import org.apache.drill.exec.store.mapr.TableFormatMatcher; +import org.apache.drill.exec.store.mapr.TableFormatPlugin; + +import com.mapr.fs.MapRFileStatus; + +public class MapRDBFormatMatcher extends TableFormatMatcher { + + public MapRDBFormatMatcher(TableFormatPlugin plugin) { + super(plugin); + } + + @Override + protected boolean isSupportedTable(MapRFileStatus status) throws IOException { + return !getFormatPlugin() + .getMaprFS() + .getTableProperties(status.getPath()) + .getAttr() + .getIsMarlinTable(); + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java new file mode 100644 index 0000000..9fe16e4 --- /dev/null +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.mapr.db; + +import java.io.IOException; +import java.util.List; +import java.util.Set; + +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.logical.StoragePluginConfig; +import org.apache.drill.exec.physical.base.AbstractGroupScan; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.store.StoragePluginOptimizerRule; +import org.apache.drill.exec.store.dfs.FileSelection; +import org.apache.drill.exec.store.dfs.FormatMatcher; +import org.apache.drill.exec.store.hbase.HBaseScanSpec; +import org.apache.drill.exec.store.mapr.TableFormatPlugin; +import org.apache.drill.exec.store.mapr.db.binary.BinaryTableGroupScan; +import org.apache.drill.exec.store.mapr.db.json.JsonScanSpec; +import org.apache.drill.exec.store.mapr.db.json.JsonTableGroupScan; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.google.common.collect.ImmutableSet; +import com.mapr.fs.tables.TableProperties; + +public class MapRDBFormatPlugin extends TableFormatPlugin { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBFormatPlugin.class); + + private final MapRDBFormatMatcher matcher; + + public MapRDBFormatPlugin(String name, DrillbitContext context, Configuration fsConf, + StoragePluginConfig storageConfig, MapRDBFormatPluginConfig formatConfig) { + super(name, context, fsConf, storageConfig, formatConfig); + matcher = new MapRDBFormatMatcher(this); + } + + @Override + public FormatMatcher getMatcher() { + return matcher; + } + + @Override + @JsonIgnore + public Set<StoragePluginOptimizerRule> getOptimizerRules() { + return ImmutableSet.of(MapRDBPushFilterIntoScan.FILTER_ON_SCAN, MapRDBPushFilterIntoScan.FILTER_ON_PROJECT); + } + + @Override + public AbstractGroupScan getGroupScan(String userName, FileSelection selection, + List<SchemaPath> columns) throws IOException { + List<String> files = selection.getFiles(); + assert (files.size() == 1); + String tableName = files.get(0); + TableProperties props = getMaprFS().getTableProperties(new Path(tableName)); + + if (props.getAttr().getJson()) { + JsonScanSpec scanSpec = new JsonScanSpec(tableName, null/*condition*/); + return new JsonTableGroupScan(userName, getStoragePlugin(), this, scanSpec, columns); + } else { + HBaseScanSpec scanSpec = new HBaseScanSpec(tableName); + return new BinaryTableGroupScan(userName, getStoragePlugin(), this, scanSpec, columns); + } + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPluginConfig.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPluginConfig.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPluginConfig.java new file mode 100644 index 0000000..82b360c --- /dev/null +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPluginConfig.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.mapr.db; + +import org.apache.drill.exec.store.mapr.TableFormatPluginConfig; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonInclude.Include; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; + +@JsonTypeName("maprdb") @JsonInclude(Include.NON_DEFAULT) +public class MapRDBFormatPluginConfig extends TableFormatPluginConfig { + + private boolean allTextMode = false; + private boolean readAllNumbersAsDouble = false; + + @Override + public int hashCode() { + return 53; + } + + @Override + protected boolean impEquals(Object obj) { + MapRDBFormatPluginConfig other = (MapRDBFormatPluginConfig)obj; + if (readAllNumbersAsDouble != other.readAllNumbersAsDouble) { + return false; + } else if (allTextMode != other.allTextMode) { + return false; + } + + return true; + } + + public boolean isReadAllNumbersAsDouble() { + return readAllNumbersAsDouble; + } + + public boolean isAllTextMode() { + return allTextMode; + } + + @JsonProperty("allTextMode") + public void setAllTextMode(boolean mode) { + allTextMode = mode; + } + + @JsonProperty("readAllNumbersAsDouble") + public void setReadAllNumbersAsDouble(boolean read) { + readAllNumbersAsDouble = read; + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBGroupScan.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBGroupScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBGroupScan.java new file mode 100644 index 0000000..8563b78 --- /dev/null +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBGroupScan.java @@ -0,0 +1,283 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.mapr.db; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.NavigableMap; +import java.util.PriorityQueue; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.physical.EndpointAffinity; +import org.apache.drill.exec.physical.base.AbstractGroupScan; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.exec.store.dfs.FileSystemConfig; +import org.apache.drill.exec.store.dfs.FileSystemPlugin; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +public abstract class MapRDBGroupScan extends AbstractGroupScan { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBGroupScan.class); + + private FileSystemPlugin storagePlugin; + + private MapRDBFormatPlugin formatPlugin; + + protected MapRDBFormatPluginConfig formatPluginConfig; + + protected List<SchemaPath> columns; + + protected Map<Integer, List<MapRDBSubScanSpec>> endpointFragmentMapping; + + protected NavigableMap<TabletFragmentInfo, String> regionsToScan; + + private boolean filterPushedDown = false; + + private Stopwatch watch = Stopwatch.createUnstarted(); + + private static final Comparator<List<MapRDBSubScanSpec>> LIST_SIZE_COMPARATOR = new Comparator<List<MapRDBSubScanSpec>>() { + @Override + public int compare(List<MapRDBSubScanSpec> list1, List<MapRDBSubScanSpec> list2) { + return list1.size() - list2.size(); + } + }; + + private static final Comparator<List<MapRDBSubScanSpec>> LIST_SIZE_COMPARATOR_REV = Collections.reverseOrder(LIST_SIZE_COMPARATOR); + + public MapRDBGroupScan(MapRDBGroupScan that) { + super(that); + this.columns = that.columns; + this.formatPlugin = that.formatPlugin; + this.formatPluginConfig = that.formatPluginConfig; + this.storagePlugin = that.storagePlugin; + this.regionsToScan = that.regionsToScan; + this.filterPushedDown = that.filterPushedDown; + } + + public MapRDBGroupScan(FileSystemPlugin storagePlugin, + MapRDBFormatPlugin formatPlugin, List<SchemaPath> columns, String userName) { + super(userName); + this.storagePlugin = storagePlugin; + this.formatPlugin = formatPlugin; + this.formatPluginConfig = (MapRDBFormatPluginConfig)formatPlugin.getConfig(); + this.columns = columns; + } + + @Override + public List<EndpointAffinity> getOperatorAffinity() { + watch.reset(); + watch.start(); + Map<String, DrillbitEndpoint> endpointMap = new HashMap<String, DrillbitEndpoint>(); + for (DrillbitEndpoint ep : formatPlugin.getContext().getBits()) { + endpointMap.put(ep.getAddress(), ep); + } + + Map<DrillbitEndpoint, EndpointAffinity> affinityMap = new HashMap<DrillbitEndpoint, EndpointAffinity>(); + for (String serverName : regionsToScan.values()) { + DrillbitEndpoint ep = endpointMap.get(serverName); + if (ep != null) { + EndpointAffinity affinity = affinityMap.get(ep); + if (affinity == null) { + affinityMap.put(ep, new EndpointAffinity(ep, 1)); + } else { + affinity.addAffinity(1); + } + } + } + logger.debug("Took {} µs to get operator affinity", watch.elapsed(TimeUnit.NANOSECONDS)/1000); + return Lists.newArrayList(affinityMap.values()); + } + + /** + * + * @param incomingEndpoints + */ + @Override + public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) { + watch.reset(); + watch.start(); + + final int numSlots = incomingEndpoints.size(); + Preconditions.checkArgument(numSlots <= regionsToScan.size(), + String.format("Incoming endpoints %d is greater than number of scan regions %d", numSlots, regionsToScan.size())); + + /* + * Minimum/Maximum number of assignment per slot + */ + final int minPerEndpointSlot = (int) Math.floor((double)regionsToScan.size() / numSlots); + final int maxPerEndpointSlot = (int) Math.ceil((double)regionsToScan.size() / numSlots); + + /* + * initialize (endpoint index => HBaseSubScanSpec list) map + */ + endpointFragmentMapping = Maps.newHashMapWithExpectedSize(numSlots); + + /* + * another map with endpoint (hostname => corresponding index list) in 'incomingEndpoints' list + */ + Map<String, Queue<Integer>> endpointHostIndexListMap = Maps.newHashMap(); + + /* + * Initialize these two maps + */ + for (int i = 0; i < numSlots; ++i) { + endpointFragmentMapping.put(i, new ArrayList<MapRDBSubScanSpec>(maxPerEndpointSlot)); + String hostname = incomingEndpoints.get(i).getAddress(); + Queue<Integer> hostIndexQueue = endpointHostIndexListMap.get(hostname); + if (hostIndexQueue == null) { + hostIndexQueue = Lists.newLinkedList(); + endpointHostIndexListMap.put(hostname, hostIndexQueue); + } + hostIndexQueue.add(i); + } + + Set<Entry<TabletFragmentInfo, String>> regionsToAssignSet = Sets.newHashSet(regionsToScan.entrySet()); + + /* + * First, we assign regions which are hosted on region servers running on drillbit endpoints + */ + for (Iterator<Entry<TabletFragmentInfo, String>> regionsIterator = regionsToAssignSet.iterator(); regionsIterator.hasNext(); /*nothing*/) { + Entry<TabletFragmentInfo, String> regionEntry = regionsIterator.next(); + /* + * Test if there is a drillbit endpoint which is also an HBase RegionServer that hosts the current HBase region + */ + Queue<Integer> endpointIndexlist = endpointHostIndexListMap.get(regionEntry.getValue()); + if (endpointIndexlist != null) { + Integer slotIndex = endpointIndexlist.poll(); + List<MapRDBSubScanSpec> endpointSlotScanList = endpointFragmentMapping.get(slotIndex); + endpointSlotScanList.add(getSubScanSpec(regionEntry.getKey())); + // add to the tail of the slot list, to add more later in round robin fashion + endpointIndexlist.offer(slotIndex); + // this region has been assigned + regionsIterator.remove(); + } + } + + /* + * Build priority queues of slots, with ones which has tasks lesser than 'minPerEndpointSlot' and another which have more. + */ + PriorityQueue<List<MapRDBSubScanSpec>> minHeap = new PriorityQueue<List<MapRDBSubScanSpec>>(numSlots, LIST_SIZE_COMPARATOR); + PriorityQueue<List<MapRDBSubScanSpec>> maxHeap = new PriorityQueue<List<MapRDBSubScanSpec>>(numSlots, LIST_SIZE_COMPARATOR_REV); + for(List<MapRDBSubScanSpec> listOfScan : endpointFragmentMapping.values()) { + if (listOfScan.size() < minPerEndpointSlot) { + minHeap.offer(listOfScan); + } else if (listOfScan.size() > minPerEndpointSlot){ + maxHeap.offer(listOfScan); + } + } + + /* + * Now, let's process any regions which remain unassigned and assign them to slots with minimum number of assignments. + */ + if (regionsToAssignSet.size() > 0) { + for (Entry<TabletFragmentInfo, String> regionEntry : regionsToAssignSet) { + List<MapRDBSubScanSpec> smallestList = minHeap.poll(); + smallestList.add(getSubScanSpec(regionEntry.getKey())); + if (smallestList.size() < maxPerEndpointSlot) { + minHeap.offer(smallestList); + } + } + } + + /* + * While there are slots with lesser than 'minPerEndpointSlot' unit work, balance from those with more. + */ + while(minHeap.peek() != null && minHeap.peek().size() < minPerEndpointSlot) { + List<MapRDBSubScanSpec> smallestList = (List<MapRDBSubScanSpec>) minHeap.poll(); + List<MapRDBSubScanSpec> largestList = (List<MapRDBSubScanSpec>) maxHeap.poll(); + smallestList.add(largestList.remove(largestList.size()-1)); + if (largestList.size() > minPerEndpointSlot) { + maxHeap.offer(largestList); + } + if (smallestList.size() < minPerEndpointSlot) { + minHeap.offer(smallestList); + } + } + + /* no slot should be empty at this point */ + assert (minHeap.peek() == null || minHeap.peek().size() > 0) : String.format( + "Unable to assign tasks to some endpoints.\nEndpoints: {}.\nAssignment Map: {}.", + incomingEndpoints, endpointFragmentMapping.toString()); + + logger.debug("Built assignment map in {} µs.\nEndpoints: {}.\nAssignment Map: {}", + watch.elapsed(TimeUnit.NANOSECONDS)/1000, incomingEndpoints, endpointFragmentMapping.toString()); + } + + @Override + public int getMaxParallelizationWidth() { + return regionsToScan.size(); + } + + @JsonIgnore + public MapRDBFormatPlugin getFormatPlugin() { + return formatPlugin; + } + + @Override + public String getDigest() { + return toString(); + } + + @JsonProperty("storage") + public FileSystemConfig getStorageConfig() { + return (FileSystemConfig) storagePlugin.getConfig(); + } + + @JsonIgnore + public FileSystemPlugin getStoragePlugin(){ + return storagePlugin; + } + + @JsonProperty + public List<SchemaPath> getColumns() { + return columns; + } + + @JsonIgnore + public boolean canPushdownProjects(List<SchemaPath> columns) { + return true; + } + + @JsonIgnore + public void setFilterPushedDown(boolean b) { + this.filterPushedDown = true; + } + + @JsonIgnore + public boolean isFilterPushedDown() { + return filterPushedDown; + } + + protected abstract MapRDBSubScanSpec getSubScanSpec(TabletFragmentInfo key); + +} http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushFilterIntoScan.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushFilterIntoScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushFilterIntoScan.java new file mode 100644 index 0000000..7292182 --- /dev/null +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushFilterIntoScan.java @@ -0,0 +1,205 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.mapr.db; + +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelOptRuleOperand; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rex.RexNode; +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.exec.planner.logical.DrillOptiq; +import org.apache.drill.exec.planner.logical.DrillParseContext; +import org.apache.drill.exec.planner.logical.RelOptHelper; +import org.apache.drill.exec.planner.physical.FilterPrel; +import org.apache.drill.exec.planner.physical.PrelUtil; +import org.apache.drill.exec.planner.physical.ProjectPrel; +import org.apache.drill.exec.planner.physical.ScanPrel; +import org.apache.drill.exec.store.StoragePluginOptimizerRule; +import org.apache.drill.exec.store.hbase.HBaseScanSpec; +import org.apache.drill.exec.store.mapr.db.binary.BinaryTableGroupScan; +import org.apache.drill.exec.store.mapr.db.binary.MapRDBFilterBuilder; +import org.apache.drill.exec.store.mapr.db.json.JsonConditionBuilder; +import org.apache.drill.exec.store.mapr.db.json.JsonScanSpec; +import org.apache.drill.exec.store.mapr.db.json.JsonTableGroupScan; + +import com.google.common.collect.ImmutableList; + +public abstract class MapRDBPushFilterIntoScan extends StoragePluginOptimizerRule { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBPushFilterIntoScan.class); + + private MapRDBPushFilterIntoScan(RelOptRuleOperand operand, String description) { + super(operand, description); + } + + public static final StoragePluginOptimizerRule FILTER_ON_SCAN = new MapRDBPushFilterIntoScan(RelOptHelper.some(FilterPrel.class, RelOptHelper.any(ScanPrel.class)), "MapRDBPushFilterIntoScan:Filter_On_Scan") { + + @Override + public void onMatch(RelOptRuleCall call) { + final ScanPrel scan = (ScanPrel) call.rel(1); + final FilterPrel filter = (FilterPrel) call.rel(0); + final RexNode condition = filter.getCondition(); + + if (scan.getGroupScan() instanceof BinaryTableGroupScan) { + BinaryTableGroupScan groupScan = (BinaryTableGroupScan)scan.getGroupScan(); + doPushFilterIntoBinaryGroupScan(call, filter, null, scan, groupScan, condition); + } else { + assert(scan.getGroupScan() instanceof JsonTableGroupScan); + JsonTableGroupScan groupScan = (JsonTableGroupScan)scan.getGroupScan(); + doPushFilterIntoJsonGroupScan(call, filter, null, scan, groupScan, condition); + } + } + + @Override + public boolean matches(RelOptRuleCall call) { + final ScanPrel scan = (ScanPrel) call.rel(1); + if (scan.getGroupScan() instanceof BinaryTableGroupScan || + scan.getGroupScan() instanceof JsonTableGroupScan) { + return super.matches(call); + } + return false; + } + }; + + public static final StoragePluginOptimizerRule FILTER_ON_PROJECT = new MapRDBPushFilterIntoScan(RelOptHelper.some(FilterPrel.class, RelOptHelper.some(ProjectPrel.class, RelOptHelper.any(ScanPrel.class))), "MapRDBPushFilterIntoScan:Filter_On_Project") { + + @Override + public void onMatch(RelOptRuleCall call) { + final ScanPrel scan = (ScanPrel) call.rel(2); + final ProjectPrel project = (ProjectPrel) call.rel(1); + final FilterPrel filter = (FilterPrel) call.rel(0); + + // convert the filter to one that references the child of the project + final RexNode condition = RelOptUtil.pushPastProject(filter.getCondition(), project); + + if (scan.getGroupScan() instanceof BinaryTableGroupScan) { + BinaryTableGroupScan groupScan = (BinaryTableGroupScan)scan.getGroupScan(); + doPushFilterIntoBinaryGroupScan(call, filter, project, scan, groupScan, condition); + } else { + assert(scan.getGroupScan() instanceof JsonTableGroupScan); + JsonTableGroupScan groupScan = (JsonTableGroupScan)scan.getGroupScan(); + doPushFilterIntoJsonGroupScan(call, filter, project, scan, groupScan, condition); + } + } + + @Override + public boolean matches(RelOptRuleCall call) { + final ScanPrel scan = (ScanPrel) call.rel(2); + if (scan.getGroupScan() instanceof BinaryTableGroupScan || + scan.getGroupScan() instanceof JsonTableGroupScan) { + return super.matches(call); + } + return false; + } + }; + + protected void doPushFilterIntoJsonGroupScan(RelOptRuleCall call, + FilterPrel filter, final ProjectPrel project, ScanPrel scan, + JsonTableGroupScan groupScan, RexNode condition) { + + if (groupScan.isFilterPushedDown()) { + /* + * The rule can get triggered again due to the transformed "scan => filter" sequence + * created by the earlier execution of this rule when we could not do a complete + * conversion of Optiq Filter's condition to HBase Filter. In such cases, we rely upon + * this flag to not do a re-processing of the rule on the already transformed call. + */ + return; + } + + LogicalExpression conditionExp = null; + try { + conditionExp = DrillOptiq.toDrill(new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, condition); + } catch (ClassCastException e) { + // MD-771 bug in DrillOptiq.toDrill() causes filter condition on ITEM operator to throw ClassCastException + // For such cases, we return without pushdown + return; + } + final JsonConditionBuilder jsonConditionBuilder = new JsonConditionBuilder(groupScan, conditionExp); + final JsonScanSpec newScanSpec = jsonConditionBuilder.parseTree(); + if (newScanSpec == null) { + return; //no filter pushdown ==> No transformation. + } + + final JsonTableGroupScan newGroupsScan = new JsonTableGroupScan(groupScan.getUserName(), + groupScan.getStoragePlugin(), + groupScan.getFormatPlugin(), + newScanSpec, + groupScan.getColumns()); + newGroupsScan.setFilterPushedDown(true); + + final ScanPrel newScanPrel = ScanPrel.create(scan, filter.getTraitSet(), newGroupsScan, scan.getRowType()); + + // Depending on whether is a project in the middle, assign either scan or copy of project to childRel. + final RelNode childRel = project == null ? newScanPrel : project.copy(project.getTraitSet(), ImmutableList.of((RelNode)newScanPrel));; + + if (jsonConditionBuilder.isAllExpressionsConverted()) { + /* + * Since we could convert the entire filter condition expression into an HBase filter, + * we can eliminate the filter operator altogether. + */ + call.transformTo(childRel); + } else { + call.transformTo(filter.copy(filter.getTraitSet(), ImmutableList.of(childRel))); + } + } + + protected void doPushFilterIntoBinaryGroupScan(final RelOptRuleCall call, + final FilterPrel filter, + final ProjectPrel project, + final ScanPrel scan, + final BinaryTableGroupScan groupScan, + final RexNode condition) { + + if (groupScan.isFilterPushedDown()) { + /* + * The rule can get triggered again due to the transformed "scan => filter" sequence + * created by the earlier execution of this rule when we could not do a complete + * conversion of Optiq Filter's condition to HBase Filter. In such cases, we rely upon + * this flag to not do a re-processing of the rule on the already transformed call. + */ + return; + } + + final LogicalExpression conditionExp = DrillOptiq.toDrill(new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, condition); + final MapRDBFilterBuilder maprdbFilterBuilder = new MapRDBFilterBuilder(groupScan, conditionExp); + final HBaseScanSpec newScanSpec = maprdbFilterBuilder.parseTree(); + if (newScanSpec == null) { + return; //no filter pushdown ==> No transformation. + } + + final BinaryTableGroupScan newGroupsScan = new BinaryTableGroupScan(groupScan.getUserName(), groupScan.getStoragePlugin(), + groupScan.getFormatPlugin(), newScanSpec, groupScan.getColumns()); + newGroupsScan.setFilterPushedDown(true); + + final ScanPrel newScanPrel = ScanPrel.create(scan, filter.getTraitSet(), newGroupsScan, scan.getRowType()); + + // Depending on whether is a project in the middle, assign either scan or copy of project to childRel. + final RelNode childRel = project == null ? newScanPrel : project.copy(project.getTraitSet(), ImmutableList.of((RelNode)newScanPrel));; + + if (maprdbFilterBuilder.isAllExpressionsConverted()) { + /* + * Since we could convert the entire filter condition expression into an HBase filter, + * we can eliminate the filter operator altogether. + */ + call.transformTo(childRel); + } else { + call.transformTo(filter.copy(filter.getTraitSet(), ImmutableList.of(childRel))); + } + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBScanBatchCreator.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBScanBatchCreator.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBScanBatchCreator.java new file mode 100644 index 0000000..1d51223 --- /dev/null +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBScanBatchCreator.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.mapr.db; + +import java.util.List; + +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.impl.BatchCreator; +import org.apache.drill.exec.physical.impl.ScanBatch; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.store.RecordReader; +import org.apache.drill.exec.store.hbase.HBaseRecordReader; +import org.apache.drill.exec.store.hbase.HBaseSubScan.HBaseSubScanSpec; +import org.apache.drill.exec.store.mapr.db.binary.BinaryTableGroupScan; +import org.apache.drill.exec.store.mapr.db.json.MaprDBJsonRecordReader; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +public class MapRDBScanBatchCreator implements BatchCreator<MapRDBSubScan>{ + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBScanBatchCreator.class); + + @Override + public ScanBatch getBatch(FragmentContext context, MapRDBSubScan subScan, List<RecordBatch> children) throws ExecutionSetupException { + Preconditions.checkArgument(children.isEmpty()); + List<RecordReader> readers = Lists.newArrayList(); + Configuration conf = HBaseConfiguration.create(); + for(MapRDBSubScanSpec scanSpec : subScan.getRegionScanSpecList()){ + try { + if (BinaryTableGroupScan.TABLE_BINARY.equals(subScan.getTableType())) { + readers.add(new HBaseRecordReader(conf, getHBaseSubScanSpec(scanSpec), subScan.getColumns(), context)); + } else { + readers.add(new MaprDBJsonRecordReader(scanSpec, subScan.getFormatPluginConfig(), subScan.getColumns(), context)); + } + } catch (Exception e1) { + throw new ExecutionSetupException(e1); + } + } + return new ScanBatch(subScan, context, readers.iterator()); + } + + private HBaseSubScanSpec getHBaseSubScanSpec(MapRDBSubScanSpec scanSpec) { + return new HBaseSubScanSpec(scanSpec.getTableName(), scanSpec.getRegionServer(), + scanSpec.getStartRow(), scanSpec.getStopRow(), scanSpec.getSerializedFilter(), null); + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBSubScan.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBSubScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBSubScan.java new file mode 100644 index 0000000..dea6867 --- /dev/null +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBSubScan.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.mapr.db; + +import java.util.Iterator; +import java.util.List; + +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.logical.StoragePluginConfig; +import org.apache.drill.exec.physical.base.AbstractBase; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.PhysicalVisitor; +import org.apache.drill.exec.physical.base.SubScan; +import org.apache.drill.exec.store.StoragePluginRegistry; +import org.apache.drill.exec.store.dfs.FileSystemPlugin; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; + +// Class containing information for reading a single HBase region +@JsonTypeName("maprdb-sub-scan") +public class MapRDBSubScan extends AbstractBase implements SubScan { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MapRDBSubScan.class); + + @JsonProperty + public final StoragePluginConfig storage; + @JsonIgnore + private final MapRDBFormatPluginConfig fsFormatPluginConfig; + private final FileSystemPlugin fsStoragePlugin; + private final List<MapRDBSubScanSpec> regionScanSpecList; + private final List<SchemaPath> columns; + private final String tableType; + + @JsonCreator + public MapRDBSubScan(@JacksonInject StoragePluginRegistry registry, + @JsonProperty("userName") String userName, + @JsonProperty("formatPluginConfig") MapRDBFormatPluginConfig formatPluginConfig, + @JsonProperty("storage") StoragePluginConfig storage, + @JsonProperty("regionScanSpecList") List<MapRDBSubScanSpec> regionScanSpecList, + @JsonProperty("columns") List<SchemaPath> columns, + @JsonProperty("tableType") String tableType) throws ExecutionSetupException { + super(userName); + this.fsFormatPluginConfig = formatPluginConfig; + this.fsStoragePlugin = (FileSystemPlugin) registry.getPlugin(storage); + this.regionScanSpecList = regionScanSpecList; + this.storage = storage; + this.columns = columns; + this.tableType = tableType; + } + + public MapRDBSubScan(String userName, MapRDBFormatPluginConfig formatPluginConfig, FileSystemPlugin storagePlugin, StoragePluginConfig config, + List<MapRDBSubScanSpec> maprSubScanSpecs, List<SchemaPath> columns, String tableType) { + super(userName); + fsFormatPluginConfig = formatPluginConfig; + fsStoragePlugin = storagePlugin; + storage = config; + this.regionScanSpecList = maprSubScanSpecs; + this.columns = columns; + this.tableType = tableType; + } + + public List<MapRDBSubScanSpec> getRegionScanSpecList() { + return regionScanSpecList; + } + + public List<SchemaPath> getColumns() { + return columns; + } + + @Override + public boolean isExecutable() { + return false; + } + + @Override + public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E { + return physicalVisitor.visitSubScan(this, value); + } + + @Override + public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) { + Preconditions.checkArgument(children.isEmpty()); + return new MapRDBSubScan(getUserName(), fsFormatPluginConfig, fsStoragePlugin, storage, regionScanSpecList, columns, tableType); + } + + @Override + public Iterator<PhysicalOperator> iterator() { + return ImmutableSet.<PhysicalOperator>of().iterator(); + } + + @Override + public int getOperatorType() { + return 1001; + } + + public String getTableType() { + return tableType; + } + + public MapRDBFormatPluginConfig getFormatPluginConfig() { + return fsFormatPluginConfig; + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBSubScanSpec.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBSubScanSpec.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBSubScanSpec.java new file mode 100644 index 0000000..3ffe47c --- /dev/null +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBSubScanSpec.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.mapr.db; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.mapr.fs.jni.MapRConstants; +import com.mapr.org.apache.hadoop.hbase.util.Bytes; + +public class MapRDBSubScanSpec { + + protected String tableName; + protected String regionServer; + protected byte[] startRow; + protected byte[] stopRow; + protected byte[] serializedFilter; + + @JsonCreator + public MapRDBSubScanSpec(@JsonProperty("tableName") String tableName, + @JsonProperty("regionServer") String regionServer, + @JsonProperty("startRow") byte[] startRow, + @JsonProperty("stopRow") byte[] stopRow, + @JsonProperty("serializedFilter") byte[] serializedFilter, + @JsonProperty("filterString") String filterString) { + if (serializedFilter != null && filterString != null) { + throw new IllegalArgumentException("The parameters 'serializedFilter' or 'filterString' cannot be specified at the same time."); + } + this.tableName = tableName; + this.regionServer = regionServer; + this.startRow = startRow; + this.stopRow = stopRow; + this.serializedFilter = serializedFilter; + } + + /* package */ MapRDBSubScanSpec() { + // empty constructor, to be used with builder pattern; + } + + public String getTableName() { + return tableName; + } + + public MapRDBSubScanSpec setTableName(String tableName) { + this.tableName = tableName; + return this; + } + + public String getRegionServer() { + return regionServer; + } + + public MapRDBSubScanSpec setRegionServer(String regionServer) { + this.regionServer = regionServer; + return this; + } + + /** + * @return the raw (not-encoded) start row key for this sub-scan + */ + public byte[] getStartRow() { + return startRow == null ? MapRConstants.EMPTY_BYTE_ARRAY: startRow; + } + + public MapRDBSubScanSpec setStartRow(byte[] startRow) { + this.startRow = startRow; + return this; + } + + /** + * @return the raw (not-encoded) stop row key for this sub-scan + */ + public byte[] getStopRow() { + return stopRow == null ? MapRConstants.EMPTY_BYTE_ARRAY : stopRow; + } + + public MapRDBSubScanSpec setStopRow(byte[] stopRow) { + this.stopRow = stopRow; + return this; + } + + public byte[] getSerializedFilter() { + return serializedFilter; + } + + public MapRDBSubScanSpec setSerializedFilter(byte[] serializedFilter) { + this.serializedFilter = serializedFilter; + return this; + } + + @Override + public String toString() { + return "MapRDBSubScanSpec [tableName=" + tableName + + ", startRow=" + (startRow == null ? null : Bytes.toStringBinary(startRow)) + + ", stopRow=" + (stopRow == null ? null : Bytes.toStringBinary(stopRow)) + + ", filter=" + (getSerializedFilter() == null ? null : Bytes.toBase64(getSerializedFilter())) + + ", regionServer=" + regionServer + "]"; + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBTableStats.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBTableStats.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBTableStats.java new file mode 100644 index 0000000..162776c --- /dev/null +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBTableStats.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.mapr.db; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.mapr.TableMappingRulesFactory; + +import com.mapr.fs.hbase.HBaseAdminImpl; + +public class MapRDBTableStats { + private static volatile HBaseAdminImpl admin = null; + + private long numRows; + + public MapRDBTableStats(Configuration conf, String tablePath) throws Exception { + if (admin == null) { + synchronized (MapRDBTableStats.class) { + if (admin == null) { + Configuration config = conf; + admin = new HBaseAdminImpl(config, TableMappingRulesFactory.create(conf)); + } + } + } + numRows = admin.getNumRows(tablePath); + } + + public long getNumRows() { + return numRows; + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/TabletFragmentInfo.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/TabletFragmentInfo.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/TabletFragmentInfo.java new file mode 100644 index 0000000..e71c67c --- /dev/null +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/TabletFragmentInfo.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.mapr.db; + +import org.apache.hadoop.hbase.HRegionInfo; + +import com.mapr.db.impl.TabletInfoImpl; + +public class TabletFragmentInfo implements Comparable<TabletFragmentInfo> { + + final private HRegionInfo regionInfo; + final private TabletInfoImpl tabletInfoImpl; + + public TabletFragmentInfo(HRegionInfo regionInfo) { + this(null, regionInfo); + } + + public TabletFragmentInfo(TabletInfoImpl tabletInfoImpl) { + this(tabletInfoImpl, null); + } + + TabletFragmentInfo(TabletInfoImpl tabletInfoImpl, HRegionInfo regionInfo) { + this.regionInfo = regionInfo; + this.tabletInfoImpl = tabletInfoImpl; + } + + public HRegionInfo getRegionInfo() { + return regionInfo; + } + + public TabletInfoImpl getTabletInfoImpl() { + return tabletInfoImpl; + } + + public boolean containsRow(byte[] row) { + return tabletInfoImpl != null ? tabletInfoImpl.containsRow(row) : + regionInfo.containsRow(row); + } + + public byte[] getStartKey() { + return tabletInfoImpl != null ? tabletInfoImpl.getStartRow() : + regionInfo.getStartKey(); + } + + public byte[] getEndKey() { + return tabletInfoImpl != null ? tabletInfoImpl.getStopRow() : + regionInfo.getEndKey(); + } + + @Override + public int compareTo(TabletFragmentInfo o) { + return tabletInfoImpl != null ? tabletInfoImpl.compareTo(o.tabletInfoImpl) : + regionInfo.compareTo(o.regionInfo); + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((regionInfo == null) ? 0 : regionInfo.hashCode()); + result = prime * result + ((tabletInfoImpl == null) ? 0 : tabletInfoImpl.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + TabletFragmentInfo other = (TabletFragmentInfo) obj; + if (regionInfo == null) { + if (other.regionInfo != null) + return false; + } else if (!regionInfo.equals(other.regionInfo)) + return false; + if (tabletInfoImpl == null) { + if (other.tabletInfoImpl != null) + return false; + } else if (!tabletInfoImpl.equals(other.tabletInfoImpl)) + return false; + return true; + } + + @Override + public String toString() { + return "TabletFragmentInfo [regionInfo=" + regionInfo + ", tabletInfoImpl=" + tabletInfoImpl + + "]"; + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/c74d75ce/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/BinaryTableGroupScan.java ---------------------------------------------------------------------- diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/BinaryTableGroupScan.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/BinaryTableGroupScan.java new file mode 100644 index 0000000..a597995 --- /dev/null +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/BinaryTableGroupScan.java @@ -0,0 +1,214 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.mapr.db.binary; + +import static org.apache.drill.exec.store.mapr.db.util.CommonFns.isNullOrEmpty; + +import java.io.IOException; +import java.util.List; +import java.util.Map.Entry; +import java.util.NavigableMap; +import java.util.TreeMap; + +import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.ScanStats; +import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty; +import org.apache.drill.exec.store.StoragePluginRegistry; +import org.apache.drill.exec.store.dfs.FileSystemConfig; +import org.apache.drill.exec.store.dfs.FileSystemPlugin; +import org.apache.drill.exec.store.hbase.DrillHBaseConstants; +import org.apache.drill.exec.store.hbase.HBaseScanSpec; +import org.apache.drill.exec.store.mapr.db.MapRDBFormatPlugin; +import org.apache.drill.exec.store.mapr.db.MapRDBFormatPluginConfig; +import org.apache.drill.exec.store.mapr.db.MapRDBGroupScan; +import org.apache.drill.exec.store.mapr.db.MapRDBSubScan; +import org.apache.drill.exec.store.mapr.db.MapRDBSubScanSpec; +import org.apache.drill.exec.store.mapr.db.MapRDBTableStats; +import org.apache.drill.exec.store.mapr.db.TabletFragmentInfo; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.HTable; +import org.codehaus.jackson.annotate.JsonCreator; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Preconditions; + +@JsonTypeName("maprdb-binary-scan") +public class BinaryTableGroupScan extends MapRDBGroupScan implements DrillHBaseConstants { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BinaryTableGroupScan.class); + + public static final String TABLE_BINARY = "binary"; + + private HBaseScanSpec hbaseScanSpec; + + private HTableDescriptor hTableDesc; + + private MapRDBTableStats tableStats; + + @JsonCreator + public BinaryTableGroupScan(@JsonProperty("userName") final String userName, + @JsonProperty("hbaseScanSpec") HBaseScanSpec scanSpec, + @JsonProperty("storage") FileSystemConfig storagePluginConfig, + @JsonProperty("format") MapRDBFormatPluginConfig formatPluginConfig, + @JsonProperty("columns") List<SchemaPath> columns, + @JacksonInject StoragePluginRegistry pluginRegistry) throws IOException, ExecutionSetupException { + this (userName, + (FileSystemPlugin) pluginRegistry.getPlugin(storagePluginConfig), + (MapRDBFormatPlugin) pluginRegistry.getFormatPlugin(storagePluginConfig, formatPluginConfig), + scanSpec, columns); + } + + public BinaryTableGroupScan(String userName, FileSystemPlugin storagePlugin, + MapRDBFormatPlugin formatPlugin, HBaseScanSpec scanSpec, List<SchemaPath> columns) { + super(storagePlugin, formatPlugin, columns, userName); + this.hbaseScanSpec = scanSpec; + init(); + } + + /** + * Private constructor, used for cloning. + * @param that The HBaseGroupScan to clone + */ + private BinaryTableGroupScan(BinaryTableGroupScan that) { + super(that); + this.hbaseScanSpec = that.hbaseScanSpec; + this.endpointFragmentMapping = that.endpointFragmentMapping; + this.hTableDesc = that.hTableDesc; + this.tableStats = that.tableStats; + } + + @Override + public GroupScan clone(List<SchemaPath> columns) { + BinaryTableGroupScan newScan = new BinaryTableGroupScan(this); + newScan.columns = columns; + newScan.verifyColumns(); + return newScan; + } + + private void init() { + logger.debug("Getting region locations"); + try { + Configuration conf = HBaseConfiguration.create(); + HTable table = new HTable(conf, hbaseScanSpec.getTableName()); + tableStats = new MapRDBTableStats(conf, hbaseScanSpec.getTableName()); + this.hTableDesc = table.getTableDescriptor(); + NavigableMap<HRegionInfo, ServerName> regionsMap = table.getRegionLocations(); + table.close(); + + boolean foundStartRegion = false; + regionsToScan = new TreeMap<TabletFragmentInfo, String>(); + for (Entry<HRegionInfo, ServerName> mapEntry : regionsMap.entrySet()) { + HRegionInfo regionInfo = mapEntry.getKey(); + if (!foundStartRegion && hbaseScanSpec.getStartRow() != null && hbaseScanSpec.getStartRow().length != 0 && !regionInfo.containsRow(hbaseScanSpec.getStartRow())) { + continue; + } + foundStartRegion = true; + regionsToScan.put(new TabletFragmentInfo(regionInfo), mapEntry.getValue().getHostname()); + if (hbaseScanSpec.getStopRow() != null && hbaseScanSpec.getStopRow().length != 0 && regionInfo.containsRow(hbaseScanSpec.getStopRow())) { + break; + } + } + } catch (Exception e) { + throw new DrillRuntimeException("Error getting region info for table: " + hbaseScanSpec.getTableName(), e); + } + verifyColumns(); + } + + private void verifyColumns() { + /* + if (columns != null) { + for (SchemaPath column : columns) { + if (!(column.equals(ROW_KEY_PATH) || hTableDesc.hasFamily(HBaseUtils.getBytes(column.getRootSegment().getPath())))) { + DrillRuntimeException.format("The column family '%s' does not exist in HBase table: %s .", + column.getRootSegment().getPath(), hTableDesc.getNameAsString()); + } + } + } + */ + } + + protected MapRDBSubScanSpec getSubScanSpec(TabletFragmentInfo tfi) { + HBaseScanSpec spec = hbaseScanSpec; + MapRDBSubScanSpec subScanSpec = new MapRDBSubScanSpec( + spec.getTableName(), + regionsToScan.get(tfi), + (!isNullOrEmpty(spec.getStartRow()) && tfi.containsRow(spec.getStartRow())) ? spec.getStartRow() : tfi.getStartKey(), + (!isNullOrEmpty(spec.getStopRow()) && tfi.containsRow(spec.getStopRow())) ? spec.getStopRow() : tfi.getEndKey(), + spec.getSerializedFilter(), + null); + return subScanSpec; + } + + @Override + public MapRDBSubScan getSpecificScan(int minorFragmentId) { + assert minorFragmentId < endpointFragmentMapping.size() : String.format( + "Mappings length [%d] should be greater than minor fragment id [%d] but it isn't.", endpointFragmentMapping.size(), + minorFragmentId); + return new MapRDBSubScan(getUserName(), formatPluginConfig, getStoragePlugin(), getStoragePlugin().getConfig(), + endpointFragmentMapping.get(minorFragmentId), columns, TABLE_BINARY); + } + + @Override + public ScanStats getScanStats() { + //TODO: look at stats for this. + long rowCount = (long) ((hbaseScanSpec.getFilter() != null ? .5 : 1) * tableStats.getNumRows()); + int avgColumnSize = 10; + int numColumns = (columns == null || columns.isEmpty()) ? 100 : columns.size(); + return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, rowCount, 1, avgColumnSize * numColumns * rowCount); + } + + @Override + @JsonIgnore + public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) { + Preconditions.checkArgument(children.isEmpty()); + return new BinaryTableGroupScan(this); + } + + @JsonIgnore + public Configuration getHBaseConf() { + return HBaseConfiguration.create(); + } + + @JsonIgnore + public String getTableName() { + return getHBaseScanSpec().getTableName(); + } + + @Override + public String toString() { + return "BinaryTableGroupScan [ScanSpec=" + + hbaseScanSpec + ", columns=" + + columns + "]"; + } + + @JsonProperty + public HBaseScanSpec getHBaseScanSpec() { + return hbaseScanSpec; + } + +}