http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/StoreServiceImpl.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/StoreServiceImpl.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/StoreServiceImpl.java new file mode 100644 index 0000000..ac3b199 --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/rpc/impl/StoreServiceImpl.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.rpc.impl; + +import java.io.IOException; + +import org.apache.carbondata.common.annotations.InterfaceAudience; +import org.apache.carbondata.store.rpc.StoreService; +import org.apache.carbondata.store.rpc.model.BaseResponse; +import org.apache.carbondata.store.rpc.model.LoadDataRequest; +import org.apache.carbondata.store.rpc.model.QueryRequest; +import org.apache.carbondata.store.rpc.model.QueryResponse; +import org.apache.carbondata.store.rpc.model.ShutdownRequest; +import org.apache.carbondata.store.rpc.model.ShutdownResponse; +import org.apache.carbondata.store.worker.Worker; + +import org.apache.hadoop.ipc.ProtocolSignature; + +@InterfaceAudience.Internal +public class StoreServiceImpl implements StoreService { + + private Worker worker; + RequestHandler handler; + + public StoreServiceImpl(Worker worker) { + this.worker = worker; + this.handler = new RequestHandler(worker.getConf(), worker.getHadoopConf()); + } + + @Override + public BaseResponse loadData(LoadDataRequest request) { + return handler.handleLoadData(request); + } + + @Override + public QueryResponse query(QueryRequest request) { + return handler.handleSearch(request); + } + + @Override + public ShutdownResponse shutdown(ShutdownRequest request) { + return handler.handleShutdown(request); + } + + @Override + public long getProtocolVersion(String protocol, long clientVersion) throws IOException { + return versionID; + } + + @Override + public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, + int clientMethodsHash) throws IOException { + return null; + } + + public Worker getWorker() { + return worker; + } + + public void setWorker(Worker worker) { + this.worker = worker; + } +}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/BaseResponse.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/BaseResponse.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/BaseResponse.java new file mode 100644 index 0000000..d826b32 --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/BaseResponse.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.rpc.model; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; + +import org.apache.carbondata.common.annotations.InterfaceAudience; + +import org.apache.hadoop.io.Writable; + +@InterfaceAudience.Internal +public class BaseResponse implements Serializable, Writable { + private int status; + private String message; + + public BaseResponse() { + } + + public BaseResponse(int status, String message) { + this.status = status; + this.message = message; + } + + public int getStatus() { + return status; + } + + public void setStatus(int status) { + this.status = status; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(status); + out.writeUTF(message); + } + + @Override + public void readFields(DataInput in) throws IOException { + status = in.readInt(); + message = in.readUTF(); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/LoadDataRequest.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/LoadDataRequest.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/LoadDataRequest.java new file mode 100644 index 0000000..e79fad2 --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/LoadDataRequest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.rpc.model; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; + +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; +import org.apache.carbondata.store.util.StoreUtil; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; + +public class LoadDataRequest implements Serializable, Writable { + + private CarbonLoadModel model; + + public LoadDataRequest() { + } + + public LoadDataRequest(CarbonLoadModel model) { + this.model = model; + } + + public CarbonLoadModel getModel() { + return model; + } + + public void setModel(CarbonLoadModel model) { + this.model = model; + } + + @Override + public void write(DataOutput out) throws IOException { + WritableUtils.writeCompressedByteArray(out, StoreUtil.serialize(model)); + } + + @Override + public void readFields(DataInput in) throws IOException { + byte[] bytes = WritableUtils.readCompressedByteArray(in); + model = (CarbonLoadModel) StoreUtil.deserialize(bytes); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/QueryResponse.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/QueryResponse.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/QueryResponse.java index 033f1a5..7ad9210 100644 --- a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/QueryResponse.java +++ b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/QueryResponse.java @@ -29,19 +29,17 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; @InterfaceAudience.Internal -public class QueryResponse implements Serializable, Writable { +public class QueryResponse extends BaseResponse implements Serializable, Writable { private int queryId; - private int status; - private String message; private Object[][] rows; public QueryResponse() { + super(); } public QueryResponse(int queryId, int status, String message, Object[][] rows) { + super(status, message); this.queryId = queryId; - this.status = status; - this.message = message; this.rows = rows; } @@ -49,13 +47,6 @@ public class QueryResponse implements Serializable, Writable { return queryId; } - public int getStatus() { - return status; - } - - public String getMessage() { - return message; - } public Object[][] getRows() { return rows; @@ -63,17 +54,15 @@ public class QueryResponse implements Serializable, Writable { @Override public void write(DataOutput out) throws IOException { + super.write(out); out.writeInt(queryId); - out.writeInt(status); - out.writeUTF(message); WritableUtils.writeCompressedByteArray(out, ObjectSerializationUtil.serialize(rows)); } @Override public void readFields(DataInput in) throws IOException { + super.readFields(in); queryId = in.readInt(); - status = in.readInt(); - message = in.readUTF(); try { rows = (Object[][])ObjectSerializationUtil.deserialize( WritableUtils.readCompressedByteArray(in)); http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/RegisterWorkerRequest.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/RegisterWorkerRequest.java b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/RegisterWorkerRequest.java index 894948b..2131e3b 100644 --- a/store/core/src/main/java/org/apache/carbondata/store/rpc/model/RegisterWorkerRequest.java +++ b/store/core/src/main/java/org/apache/carbondata/store/rpc/model/RegisterWorkerRequest.java @@ -66,4 +66,8 @@ public class RegisterWorkerRequest implements Serializable, Writable { port = in.readInt(); cores = in.readInt(); } + + @Override public String toString() { + return "RegisterWorkerRequest{" + "hostAddress='" + hostAddress + '\'' + ", port=" + port + '}'; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/core/src/main/java/org/apache/carbondata/store/scheduler/Schedulable.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/scheduler/Schedulable.java b/store/core/src/main/java/org/apache/carbondata/store/scheduler/Schedulable.java new file mode 100644 index 0000000..65d0786 --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/scheduler/Schedulable.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.scheduler; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.carbondata.store.rpc.StoreService; + +public class Schedulable { + + private String id; + private String address; + private int port; + private int cores; + public StoreService service; + public AtomicInteger workload; + + public Schedulable(String id, String address, int port, int cores, StoreService service) { + this.id = id; + this.address = address; + this.port = port; + this.cores = cores; + this.service = service; + this.workload = new AtomicInteger(); + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getAddress() { + return address; + } + + public void setAddress(String address) { + this.address = address; + } + + public int getPort() { + return port; + } + + public void setPort(int port) { + this.port = port; + } + + int getCores() { + return cores; + } + + @Override public String toString() { + return "Schedulable{" + "id='" + id + '\'' + ", address='" + address + '\'' + ", port=" + port + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/core/src/main/java/org/apache/carbondata/store/scheduler/Scheduler.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/scheduler/Scheduler.java b/store/core/src/main/java/org/apache/carbondata/store/scheduler/Scheduler.java new file mode 100644 index 0000000..1b4cdde --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/scheduler/Scheduler.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.scheduler; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.store.exception.WorkerTooBusyException; +import org.apache.carbondata.store.rpc.model.BaseResponse; +import org.apache.carbondata.store.rpc.model.LoadDataRequest; +import org.apache.carbondata.store.rpc.model.QueryRequest; +import org.apache.carbondata.store.rpc.model.QueryResponse; + +/** + * [[Master]] uses Scheduler to pick a Worker to send request + */ +public class Scheduler { + + private static LogService LOGGER = LogServiceFactory.getLogService(Scheduler.class.getName()); + + // mapping of worker IP address to worker instance + private Map<String, Schedulable> ipMapWorker = new HashMap<>(); + private List<Schedulable> workers = new ArrayList<>(); + private AtomicLong index = new AtomicLong(0); + private ExecutorService executors = Executors.newCachedThreadPool(); + + /** + * Pick a Worker according to the address and workload of the Worker + * Invoke the RPC and return Future result + */ + public Future<QueryResponse> sendRequestAsync(final Schedulable worker, + final QueryRequest request) { + + LOGGER.info("sending search request to worker " + worker); + worker.workload.incrementAndGet(); + return executors.submit(new Callable<QueryResponse>() { + @Override public QueryResponse call() { + return worker.service.query(request); + } + }); + } + + public BaseResponse sendRequest(final Schedulable worker, + final LoadDataRequest request) { + + LOGGER.info("sending load data request to worker " + worker); + worker.workload.incrementAndGet(); + return worker.service.loadData(request); + } + + public Schedulable pickWorker(String splitAddress) { + Schedulable worker = ipMapWorker.get(splitAddress); + // no local worker available, choose one worker randomly + if (worker == null) { + worker = pickNexWorker(); + } + // check whether worker exceed max workload, if exceeded, pick next worker + int maxWorkload = CarbonProperties.getMaxWorkloadForWorker(worker.getCores()); + int numTry = workers.size(); + do { + if (worker.workload.get() >= maxWorkload) { + LOGGER.info("worker " + worker + " reach limit, re-select worker..."); + worker = pickNexWorker(); + numTry = numTry - 1; + } else { + numTry = -1; + } + } while (numTry > 0); + if (numTry == 0) { + // tried so many times and still not able to find Worker + throw new WorkerTooBusyException( + "All workers are busy, number of workers: " + workers.size() + ", workload limit:" + + maxWorkload); + } + + return worker; + } + + public Schedulable pickNexWorker() { + return workers.get((int) (index.get() % workers.size())); + } + + /** + * A new searcher is trying to register, add it to the map and connect to this searcher + */ + public void addWorker(Schedulable schedulable) { + workers.add(schedulable); + ipMapWorker.put(schedulable.getAddress(), schedulable); + } + + public void removeWorker(String address) { + Schedulable schedulable = ipMapWorker.get(address); + if (schedulable != null) { + ipMapWorker.remove(address); + workers.remove(schedulable); + } + } + + public List<Schedulable> getAllWorkers() { + return workers; + } + + public List<String> getAllWorkerAddresses() { + List<String> addresses = new ArrayList<>(workers.size()); + for (Schedulable worker : workers) { + addresses.add(worker.getAddress()); + } + return addresses; + } +} + http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/core/src/main/java/org/apache/carbondata/store/util/StoreUtil.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/util/StoreUtil.java b/store/core/src/main/java/org/apache/carbondata/store/util/StoreUtil.java new file mode 100644 index 0000000..fba3413 --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/util/StoreUtil.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.util; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.Map; +import java.util.Properties; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.memory.UnsafeMemoryManager; +import org.apache.carbondata.core.memory.UnsafeSortMemoryManager; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat; +import org.apache.carbondata.processing.loading.model.CarbonLoadModel; +import org.apache.carbondata.store.conf.StoreConf; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.log4j.BasicConfigurator; +import org.apache.log4j.PropertyConfigurator; + +public class StoreUtil { + + private static LogService LOGGER = LogServiceFactory.getLogService(StoreUtil.class.getName()); + + public static void loadProperties(String filePath, StoreConf conf) { + InputStream input = null; + try { + input = new FileInputStream(filePath); + Properties prop = new Properties(); + prop.load(input); + for (Map.Entry<Object, Object> entry : prop.entrySet()) { + conf.conf(entry.getKey().toString(), entry.getValue().toString()); + } + LOGGER.audit("loaded properties: " + filePath); + } catch (IOException ex) { + LOGGER.error(ex, "Failed to load properties file " + filePath); + } finally { + if (input != null) { + try { + input.close(); + } catch (IOException e) { + LOGGER.error(e); + } + } + } + } + + public static void initLog4j(String propertiesFilePath) { + BasicConfigurator.configure(); + PropertyConfigurator.configure(propertiesFilePath); + } + + public static byte[] serialize(Object object) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(1024); + try { + ObjectOutputStream oos = new ObjectOutputStream(baos); + oos.writeObject(object); + } catch (IOException e) { + LOGGER.error(e); + } + return baos.toByteArray(); + } + + public static Object deserialize(byte[] bytes) { + if (bytes == null) { + return null; + } + try { + ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes)); + return ois.readObject(); + } catch (IOException e) { + LOGGER.error(e); + } catch (ClassNotFoundException e) { + LOGGER.error(e); + } + return null; + } + + public static void configureCSVInputFormat(Configuration configuration, + CarbonLoadModel carbonLoadModel) { + CSVInputFormat.setCommentCharacter(configuration, carbonLoadModel.getCommentChar()); + CSVInputFormat.setCSVDelimiter(configuration, carbonLoadModel.getCsvDelimiter()); + CSVInputFormat.setSkipEmptyLine(configuration, carbonLoadModel.getSkipEmptyLine()); + CSVInputFormat.setEscapeCharacter(configuration, carbonLoadModel.getEscapeChar()); + CSVInputFormat.setMaxColumns(configuration, carbonLoadModel.getMaxColumns()); + CSVInputFormat.setNumberOfColumns(configuration, + "" + carbonLoadModel.getCsvHeaderColumns().length); + + CSVInputFormat.setHeaderExtractionEnabled( + configuration, + carbonLoadModel.getCsvHeader() == null || + StringUtils.isEmpty(carbonLoadModel.getCsvHeader())); + + CSVInputFormat.setQuoteCharacter(configuration, carbonLoadModel.getQuoteChar()); + + CSVInputFormat.setReadBufferSize( + configuration, + CarbonProperties.getInstance().getProperty( + CarbonCommonConstants.CSV_READ_BUFFER_SIZE, + CarbonCommonConstants.CSV_READ_BUFFER_SIZE_DEFAULT)); + } + + public static void clearUnsafeMemory(long taskId) { + UnsafeMemoryManager.INSTANCE.freeMemoryAll(taskId); + UnsafeSortMemoryManager.INSTANCE.freeMemoryAll(taskId); + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/core/src/main/java/org/apache/carbondata/store/worker/Worker.java ---------------------------------------------------------------------- diff --git a/store/core/src/main/java/org/apache/carbondata/store/worker/Worker.java b/store/core/src/main/java/org/apache/carbondata/store/worker/Worker.java new file mode 100644 index 0000000..6fa2191 --- /dev/null +++ b/store/core/src/main/java/org/apache/carbondata/store/worker/Worker.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.store.worker; + +import java.io.IOException; +import java.net.BindException; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.store.conf.StoreConf; +import org.apache.carbondata.store.rpc.RegistryService; +import org.apache.carbondata.store.rpc.ServiceFactory; +import org.apache.carbondata.store.rpc.StoreService; +import org.apache.carbondata.store.rpc.impl.StoreServiceImpl; +import org.apache.carbondata.store.rpc.model.RegisterWorkerRequest; +import org.apache.carbondata.store.rpc.model.RegisterWorkerResponse; +import org.apache.carbondata.store.util.StoreUtil; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.RPC; + +public class Worker { + + private static LogService LOGGER = LogServiceFactory.getLogService(Worker.class.getName()); + + private String id; + private RegistryService registry; + private StoreConf conf; + private Configuration hadoopConf; + private RPC.Server server; + + public Worker(StoreConf conf) { + this.conf = conf; + this.hadoopConf = this.conf.newHadoopConf(); + } + + public void start() { + try { + startService(); + registerToMaster(); + } catch (IOException e) { + LOGGER.error(e, "worker failed to start"); + } + } + + private void startService() throws IOException { + BindException exception; + // we will try to create service at worse case 100 times + int numTry = 100; + int coreNum = conf.workerCoreNum(); + String host = conf.workerHost(); + int port = conf.workerPort(); + StoreService queryService = new StoreServiceImpl(this); + do { + try { + server = new RPC.Builder(hadoopConf) + .setNumHandlers(coreNum) + .setBindAddress(host) + .setPort(port) + .setProtocol(StoreService.class) + .setInstance(queryService) + .build(); + server.start(); + + numTry = 0; + exception = null; + } catch (BindException e) { + // port is occupied, increase the port number and try again + exception = e; + port = port + 1; + numTry = numTry - 1; + } + } while (numTry > 0); + + if (exception != null) { + // we have tried many times, but still failed to find an available port + LOGGER.error(exception, "worker failed to start"); + throw exception; + } + + conf.conf(StoreConf.WORKER_PORT, port); + LOGGER.info("worker started on " + host + ":" + port + " successfully"); + + } + + public void stop() { + try { + stopService(); + } catch (InterruptedException e) { + LOGGER.error(e, "worker failed to start"); + } + } + + private void stopService() throws InterruptedException { + if (server != null) { + server.stop(); + server.join(); + server = null; + } + } + + private void registerToMaster() throws IOException { + + LOGGER.info("trying to register to master " + conf.masterHost() + ":" + conf.masterPort()); + if (registry == null) { + registry = ServiceFactory.createRegistryService(conf.masterHost(), conf.masterPort()); + } + RegisterWorkerRequest request = + new RegisterWorkerRequest(conf.workerHost(), conf.workerPort(), conf.workerCoreNum()); + try { + RegisterWorkerResponse response = registry.registerWorker(request); + id = response.getWorkerId(); + } catch (Throwable throwable) { + LOGGER.error(throwable, "worker failed to register"); + throw new IOException(throwable); + } + + LOGGER.info("worker " + id + " registered successfully"); + } + + public String getId() { + return id; + } + + public static void main(String[] args) { + if (args.length != 2) { + System.err.println("Usage: Worker <log4j file> <properties file>"); + return; + } + + StoreUtil.initLog4j(args[0]); + Worker worker = new Worker(new StoreConf(args[1])); + worker.start(); + } + + public StoreConf getConf() { + return conf; + } + + public void setConf(StoreConf conf) { + this.conf = conf; + } + + public Configuration getHadoopConf() { + return hadoopConf; + } + + public void setHadoopConf(Configuration hadoopConf) { + this.hadoopConf = hadoopConf; + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/core/src/main/scala/org/apache/carbondata/store/Master.scala ---------------------------------------------------------------------- diff --git a/store/core/src/main/scala/org/apache/carbondata/store/Master.scala b/store/core/src/main/scala/org/apache/carbondata/store/Master.scala deleted file mode 100644 index 2109251..0000000 --- a/store/core/src/main/scala/org/apache/carbondata/store/Master.scala +++ /dev/null @@ -1,283 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.store - -import java.io.IOException -import java.net.{BindException, InetAddress} -import java.util.{List => JList, Map => JMap, Objects, Random, UUID} -import java.util.concurrent.{ExecutionException, Future, TimeoutException, TimeUnit} -import java.util.concurrent.atomic.AtomicBoolean - -import scala.collection.JavaConverters._ -import scala.collection.mutable.ArrayBuffer - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.ipc.RPC -import org.apache.hadoop.mapred.JobConf -import org.apache.hadoop.mapreduce.Job - -import org.apache.carbondata.common.annotations.InterfaceAudience -import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.datastore.block.Distributable -import org.apache.carbondata.core.datastore.impl.FileFactory -import org.apache.carbondata.core.datastore.row.CarbonRow -import org.apache.carbondata.core.metadata.schema.table.CarbonTable -import org.apache.carbondata.core.scan.expression.Expression -import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.hadoop.CarbonMultiBlockSplit -import org.apache.carbondata.hadoop.api.CarbonInputFormat -import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil -import org.apache.carbondata.processing.util.CarbonLoaderUtil -import org.apache.carbondata.store.rpc.{RegistryService, ServiceFactory} -import org.apache.carbondata.store.rpc.impl.{RegistryServiceImpl, Status} -import org.apache.carbondata.store.rpc.model._ - -/** - * Master of CarbonSearch. - * It provides a Registry service for worker to register. - * And it provides search API to fire RPC call to workers. - */ -@InterfaceAudience.Internal -private[store] class Master { - private val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - - // worker host address map to EndpointRef - - private val random = new Random - - private var registryServer: RPC.Server = _ - - private val scheduler: Scheduler = new Scheduler - - def buildServer(serverHost: String, serverPort: Int): RPC.Server = { - val hadoopConf = FileFactory.getConfiguration - val builder = new RPC.Builder(hadoopConf) - builder - .setBindAddress(serverHost) - .setPort(serverPort) - .setProtocol(classOf[RegistryService]) - .setInstance(new RegistryServiceImpl(this)) - .build - } - - /** start service and listen on port passed in constructor */ - def startService(): Unit = { - if (registryServer == null) { - LOG.info("Start search mode master thread") - val isStarted: AtomicBoolean = new AtomicBoolean(false) - new Thread(new Runnable { - override def run(): Unit = { - val hostAddress = InetAddress.getLocalHost.getHostAddress - var port = CarbonProperties.getSearchMasterPort - var exception: BindException = null - var numTry = 100 // we will try to create service at worse case 100 times - do { - try { - LOG.info(s"building registry-service on $hostAddress:$port") - registryServer = buildServer(hostAddress, port) - numTry = 0 - } catch { - case e: BindException => - // port is occupied, increase the port number and try again - exception = e - LOG.error(s"start registry-service failed: ${e.getMessage}") - port = port + 1 - numTry = numTry - 1 - } - } while (numTry > 0) - if (registryServer == null) { - // we have tried many times, but still failed to find an available port - throw exception - } - if (isStarted.compareAndSet(false, false)) { - synchronized { - isStarted.compareAndSet(false, true) - } - } - LOG.info("starting registry-service") - registryServer.start() - LOG.info("registry-service started") - } - }).start() - var count = 0 - val countThreshold = 5000 - while (isStarted.compareAndSet(false, false) && count < countThreshold) { - LOG.info(s"Waiting search mode master to start, retrying $count times") - Thread.sleep(10) - count = count + 1 - } - if (count >= countThreshold) { - LOG.error(s"Search mode try $countThreshold times to start master but failed") - throw new RuntimeException( - s"Search mode try $countThreshold times to start master but failed") - } else { - LOG.info("Search mode master started") - } - } else { - LOG.info("Search mode master has already started") - } - } - - def stopService(): Unit = { - if (registryServer != null) { - registryServer.stop() - registryServer.join() - registryServer = null - } - } - - def stopAllWorkers(): Unit = { - scheduler.getAllWorkers.toSeq.foreach { case (address, schedulable) => - val response = try { - schedulable.service.shutdown(new ShutdownRequest("user")) - } catch { - case throwable: Throwable => - throw new IOException(throwable) - } - scheduler.removeWorker(address) - } - } - - /** A new searcher is trying to register, add it to the map and connect to this searcher */ - def addWorker(request: RegisterWorkerRequest): RegisterWorkerResponse = { - LOG.info(s"Receive Register request from worker ${request.getHostAddress}:${request.getPort} " + - s"with ${request.getCores} cores") - val workerId = UUID.randomUUID().toString - val workerAddress = request.getHostAddress - val workerPort = request.getPort - LOG.info(s"connecting to worker ${request.getHostAddress}:${request.getPort}, " + - s"workerId $workerId") - - val searchService = ServiceFactory.createSearchService(workerAddress, workerPort) - scheduler.addWorker(workerAddress, - new Schedulable(workerId, workerAddress, workerPort, request.getCores, searchService)) - LOG.info(s"worker ${request.getHostAddress}:${request.getPort} registered") - new RegisterWorkerResponse(workerId) - } - - /** - * Execute search by firing RPC call to worker, return the result rows - * @param table table to search - * @param columns projection column names - * @param filter filter expression - * @param globalLimit max number of rows required in Master - * @param localLimit max number of rows required in Worker - * @return - */ - def search(table: CarbonTable, columns: Array[String], filter: Expression, - globalLimit: Long, localLimit: Long): Array[CarbonRow] = { - Objects.requireNonNull(table) - Objects.requireNonNull(columns) - if (globalLimit < 0 || localLimit < 0) { - throw new IllegalArgumentException("limit should be positive") - } - - val queryId = random.nextInt - var rowCount = 0 - val output = new ArrayBuffer[CarbonRow] - - def onSuccess(result: QueryResponse): Unit = { - // in case of RPC success, collect all rows in response message - if (result.getQueryId != queryId) { - throw new IOException( - s"queryId in response does not match request: ${result.getQueryId} != $queryId") - } - if (result.getStatus != Status.SUCCESS.ordinal()) { - throw new IOException(s"failure in worker: ${ result.getMessage }") - } - - val itor = result.getRows.iterator - while (itor.hasNext && rowCount < globalLimit) { - output += new CarbonRow(itor.next()) - rowCount = rowCount + 1 - } - LOG.info(s"[QueryId:$queryId] accumulated result size $rowCount") - } - def onFaiure(e: Throwable) = throw new IOException(s"exception in worker: ${ e.getMessage }") - def onTimedout() = throw new ExecutionTimeoutException() - - // prune data and get a mapping of worker hostname to list of blocks, - // then add these blocks to the QueryRequest and fire the RPC call - val nodeBlockMapping: JMap[String, JList[Distributable]] = pruneBlock(table, columns, filter) - val tuple = nodeBlockMapping.asScala.map { case (splitAddress, blocks) => - // Build a QueryRequest - val split = new CarbonMultiBlockSplit(blocks, splitAddress) - val request = - new QueryRequest(queryId, split, table.getTableInfo, columns, filter, localLimit) - - // Find an Endpoind and send the request to it - // This RPC is non-blocking so that we do not need to wait before send to next worker - scheduler.sendRequestAsync(splitAddress, request) - } - - // loop to get the result of each Worker - tuple.foreach { case (worker: Schedulable, future: Future[QueryResponse]) => - - // if we have enough data already, we do not need to collect more result - if (rowCount < globalLimit) { - // wait for worker - val response = try { - future.get(CarbonProperties.getInstance().getQueryTimeout.toLong, TimeUnit.SECONDS) - } catch { - case e: ExecutionException => onFaiure(e) - case t: TimeoutException => onTimedout() - } finally { - worker.workload.decrementAndGet() - } - LOG.info(s"[QueryId:$queryId] receive search response from worker " + - s"${worker.address}:${worker.port}") - onSuccess(response) - } - } - output.toArray - } - - /** - * Prune data by using CarbonInputFormat.getSplit - * Return a mapping of host address to list of block - */ - private def pruneBlock( - table: CarbonTable, - columns: Array[String], - filter: Expression): JMap[String, JList[Distributable]] = { - val jobConf = new JobConf(new Configuration) - val job = new Job(jobConf) - val format = CarbonInputFormatUtil.createCarbonTableInputFormat( - job, table, columns, filter, null, null) - - // We will do FG pruning in reader side, so don't do it here - CarbonInputFormat.setFgDataMapPruning(job.getConfiguration, false) - val splits = format.getSplits(job) - val distributables = splits.asScala.map { split => - split.asInstanceOf[Distributable] - } - CarbonLoaderUtil.nodeBlockMapping( - distributables.asJava, - -1, - getWorkers.asJava, - CarbonLoaderUtil.BlockAssignmentStrategy.BLOCK_NUM_FIRST, - null) - } - - /** return hostname of all workers */ - def getWorkers: Seq[String] = scheduler.getAllWorkers.map(_._1).toSeq -} - -// Exception if execution timed out in search mode -class ExecutionTimeoutException extends RuntimeException http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/core/src/main/scala/org/apache/carbondata/store/Scheduler.scala ---------------------------------------------------------------------- diff --git a/store/core/src/main/scala/org/apache/carbondata/store/Scheduler.scala b/store/core/src/main/scala/org/apache/carbondata/store/Scheduler.scala deleted file mode 100644 index fb3ef86..0000000 --- a/store/core/src/main/scala/org/apache/carbondata/store/Scheduler.scala +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.store - -import java.io.IOException -import java.util.concurrent.{Callable, Executors, Future} -import java.util.concurrent.atomic.AtomicInteger - -import scala.collection.mutable -import scala.reflect.ClassTag -import scala.util.Random - -import org.apache.carbondata.common.annotations.InterfaceAudience -import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.store.rpc.QueryService -import org.apache.carbondata.store.rpc.model.{QueryRequest, QueryResponse} - -/** - * [[Master]] uses Scheduler to pick a Worker to send request - */ -@InterfaceAudience.Internal -private[store] class Scheduler { - // mapping of worker IP address to worker instance - private val workers = mutable.Map[String, Schedulable]() - private val random = new Random() - private val executors = Executors.newCachedThreadPool() - private val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - - /** - * Pick a Worker according to the address and workload of the Worker - * Invoke the RPC and return Future result - */ - def sendRequestAsync( - splitAddress: String, - request: QueryRequest): (Schedulable, Future[QueryResponse]) = { - require(splitAddress != null) - if (workers.isEmpty) { - throw new IOException("No worker is available") - } - var worker: Schedulable = pickWorker(splitAddress) - - // check whether worker exceed max workload, if exceeded, pick next worker - val maxWorkload = CarbonProperties.getMaxWorkloadForWorker(worker.cores) - var numTry = workers.size - do { - if (worker.workload.get() >= maxWorkload) { - LOG.info(s"worker ${worker.address}:${worker.port} reach limit, re-select worker...") - worker = pickNextWorker(worker) - numTry = numTry - 1 - } else { - numTry = -1 - } - } while (numTry > 0) - if (numTry == 0) { - // tried so many times and still not able to find Worker - throw new WorkerTooBusyException( - s"All workers are busy, number of workers: ${workers.size}, workload limit: $maxWorkload") - } - LOG.info(s"sending search request to worker ${worker.address}:${worker.port}") - val future = executors.submit( - new Callable[QueryResponse] { - override def call(): QueryResponse = worker.service.query(request) - } - ) - worker.workload.incrementAndGet() - (worker, future) - } - - private def pickWorker[T: ClassTag](splitAddress: String) = { - try { - workers(splitAddress) - } catch { - case e: NoSuchElementException => - // no local worker available, choose one worker randomly - pickRandomWorker() - } - } - - /** pick a worker randomly */ - private def pickRandomWorker() = { - val index = random.nextInt(workers.size) - workers.toSeq(index)._2 - } - - /** pick the next worker of the input worker in the [[Scheduler.workers]] */ - private def pickNextWorker(worker: Schedulable) = { - val index = workers.zipWithIndex.find { case ((address, w), index) => - w == worker - }.get._2 - if (index == workers.size - 1) { - workers.toSeq.head._2 - } else { - workers.toSeq(index + 1)._2 - } - } - - /** A new searcher is trying to register, add it to the map and connect to this searcher */ - def addWorker(address: String, schedulable: Schedulable): Unit = { - require(schedulable != null) - require(address.equals(schedulable.address)) - workers(address) = schedulable - } - - def removeWorker(address: String): Unit = { - workers.remove(address) - } - - def getAllWorkers: Iterator[(String, Schedulable)] = workers.iterator -} - -/** - * Represent a Worker which [[Scheduler]] can send - * Search request on it - * @param id Worker ID, a UUID string - * @param cores, number of cores in Worker - * @param service RPC service reference - * @param workload number of outstanding request sent to Worker - */ -private[store] class Schedulable( - val id: String, - val address: String, - val port: Int, - val cores: Int, - val service: QueryService, - var workload: AtomicInteger) { - def this(id: String, address: String, port: Int, cores: Int, service: QueryService) = { - this(id, address, port, cores, service, new AtomicInteger()) - } -} - -class WorkerTooBusyException(message: String) extends RuntimeException(message) http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/core/src/main/scala/org/apache/carbondata/store/Worker.scala ---------------------------------------------------------------------- diff --git a/store/core/src/main/scala/org/apache/carbondata/store/Worker.scala b/store/core/src/main/scala/org/apache/carbondata/store/Worker.scala deleted file mode 100644 index 2ded00b..0000000 --- a/store/core/src/main/scala/org/apache/carbondata/store/Worker.scala +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.store - -import java.io.IOException -import java.net.{BindException, InetAddress} - -import org.apache.hadoop.ipc.RPC - -import org.apache.carbondata.common.annotations.InterfaceAudience -import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.datastore.impl.FileFactory -import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.store.rpc.{QueryService, RegistryService, ServiceFactory} -import org.apache.carbondata.store.rpc.impl.QueryServiceImpl -import org.apache.carbondata.store.rpc.model.RegisterWorkerRequest - -@InterfaceAudience.Internal -private[store] object Worker { - private val LOG = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - private val hostAddress = InetAddress.getLocalHost.getHostAddress - private var port: Int = _ - private var registry: RegistryService = _ - - def init(masterHostAddress: String, masterPort: Int): Unit = { - LOG.info(s"initializing worker...") - startService() - LOG.info(s"registering to master $masterHostAddress:$masterPort") - val workerId = registerToMaster(masterHostAddress, masterPort) - LOG.info(s"worker registered to master, workerId: $workerId") - } - - def buildServer(serverHost: String, serverPort: Int): RPC.Server = { - val hadoopConf = FileFactory.getConfiguration - val builder = new RPC.Builder(hadoopConf) - builder - .setNumHandlers(Runtime.getRuntime.availableProcessors) - .setBindAddress(serverHost) - .setPort(serverPort) - .setProtocol(classOf[QueryService]) - .setInstance(new QueryServiceImpl) - .build - } - - /** - * Start to listen on port [[CarbonProperties.getSearchWorkerPort]] - */ - private def startService(): Unit = { - new Thread(new Runnable { - override def run(): Unit = { - port = CarbonProperties.getSearchWorkerPort - var searchServer: RPC.Server = null - var exception: BindException = null - var numTry = 100 // we will try to create service at worse case 100 times - do { - try { - LOG.info(s"building search-service on $hostAddress:$port") - searchServer = buildServer(hostAddress, port) - numTry = 0 - } catch { - case e: BindException => - // port is occupied, increase the port number and try again - exception = e - LOG.error(s"start search-service failed: ${e.getMessage}") - port = port + 1 - numTry = numTry - 1 - } - } while (numTry > 0) - if (searchServer == null) { - // we have tried many times, but still failed to find an available port - throw exception - } - LOG.info("starting search-service") - searchServer.start() - LOG.info("search-service started") - } - }).start() - } - - private def registerToMaster(registryHostAddress: String, registryPort: Int): String = { - LOG.info(s"trying to register to master $registryHostAddress:$registryPort") - if (registry == null) { - registry = ServiceFactory.createRegistryService(registryHostAddress, registryPort) - } - val cores = Runtime.getRuntime.availableProcessors() - val request = new RegisterWorkerRequest(hostAddress, port, cores) - val response = try { - registry.registerWorker(request) - } catch { - case throwable: Throwable => - LOG.error(s"worker failed to registered: $throwable") - throw new IOException(throwable) - } - - LOG.info("worker registered") - response.getWorkerId - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/core/src/test/scala/org/apache/carbondata/store/SchedulerSuite.scala ---------------------------------------------------------------------- diff --git a/store/core/src/test/scala/org/apache/carbondata/store/SchedulerSuite.scala b/store/core/src/test/scala/org/apache/carbondata/store/SchedulerSuite.scala deleted file mode 100644 index 95e7335..0000000 --- a/store/core/src/test/scala/org/apache/carbondata/store/SchedulerSuite.scala +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.store - -import org.apache.hadoop.ipc.ProtocolSignature -import org.scalatest.{BeforeAndAfterEach, FunSuite} - -import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.store.rpc.QueryService -import org.apache.carbondata.store.rpc.model.{QueryRequest, QueryResponse, ShutdownRequest, ShutdownResponse} - -class SchedulerSuite extends FunSuite with BeforeAndAfterEach { - - var scheduler: Scheduler = _ - var w1: Schedulable = _ - var w2: Schedulable = _ - var w3: Schedulable = _ - - override def beforeEach(): Unit = { - scheduler = new Scheduler() - w1 = new Schedulable("id1", "1.1.1.1", 1000, 4, new DummyRef()) - w2 = new Schedulable("id2", "1.1.1.2", 1000, 4, new DummyRef()) - w3 = new Schedulable("id3", "1.1.1.3", 1000, 4, new DummyRef()) - - scheduler.addWorker("1.1.1.1", w1) - scheduler.addWorker("1.1.1.2", w2) - scheduler.addWorker("1.1.1.3", w3) - } - - test("test addWorker, removeWorker, getAllWorkers") { - assertResult(Set("1.1.1.1", "1.1.1.2", "1.1.1.3"))(scheduler.getAllWorkers.toMap.keySet) - - scheduler.removeWorker("1.1.1.2") - assertResult(Set("1.1.1.1", "1.1.1.3"))(scheduler.getAllWorkers.toMap.keySet) - - val w4 = new Schedulable("id4", "1.1.1.4", 1000, 4, new DummyRef()) - scheduler.addWorker("1.1.1.4", w4) - assertResult(Set("1.1.1.1", "1.1.1.3", "1.1.1.4"))(scheduler.getAllWorkers.toMap.keySet) - assertResult("id4")(scheduler.getAllWorkers.toMap.get("1.1.1.4").get.id) - } - - test("test normal schedule") { - val (r1, _) = scheduler.sendRequestAsync("1.1.1.1", null) - assertResult(w1.id)(r1.id) - val (r2, _) = scheduler.sendRequestAsync("1.1.1.2", null) - assertResult(w2.id)(r2.id) - val (r3, _) = scheduler.sendRequestAsync("1.1.1.3", null) - assertResult(w3.id)(r3.id) - val (r4, _) = scheduler.sendRequestAsync("1.1.1.1", null) - assertResult(w1.id)(r4.id) - val (r5, _) = scheduler.sendRequestAsync("1.1.1.2", null) - assertResult(w2.id)(r5.id) - val (r6, _) = scheduler.sendRequestAsync("1.1.1.3", null) - assertResult(w3.id)(r6.id) - } - - test("test worker unavailable") { - val (r1, _) = scheduler.sendRequestAsync("1.1.1.5", null) - assert(scheduler.getAllWorkers.map(_._2.id).contains(r1.id)) - } - - test("test reschedule when target worker is overload") { - // by default, maxWorkload is number of core * 10, so it is 40 in this test suite - (1 to 40).foreach { i => - val (r2, _) = scheduler.sendRequestAsync("1.1.1.2", null) - val (r3, _) = scheduler.sendRequestAsync("1.1.1.3", null) - } - val (r, _) = scheduler.sendRequestAsync("1.1.1.3", null) - // it must be worker1 since worker3 exceed max workload - assertResult(w1.id)(r.id) - } - - test("test all workers are overload") { - // by default, maxWorkload is number of core * 10, so it is 40 in this test suite - (1 to 40).foreach { i => - val (r1, _) = scheduler.sendRequestAsync("1.1.1.1", null) - val (r2, _) = scheduler.sendRequestAsync("1.1.1.2", null) - val (r3, _) = scheduler.sendRequestAsync("1.1.1.3", null) - } - - val e = intercept[WorkerTooBusyException] { - scheduler.sendRequestAsync("1.1.1.3", null) - } - } - - test("test user configured overload param") { - val original = CarbonProperties.getInstance().getProperty( - CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT) - - CarbonProperties.getInstance().addProperty( - CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT, "3") - - (1 to 3).foreach { i => - val (r1, _) = scheduler.sendRequestAsync("1.1.1.1", null) - val (r2, _) = scheduler.sendRequestAsync("1.1.1.2", null) - val (r3, _) = scheduler.sendRequestAsync("1.1.1.3", null) - } - - val e = intercept[WorkerTooBusyException] { - scheduler.sendRequestAsync("1.1.1.3", null) - } - - if (original != null) { - CarbonProperties.getInstance().addProperty( - CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT, original) - } - } - - test("test invalid property") { - intercept[IllegalArgumentException] { - CarbonProperties.getInstance().addProperty( - CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT, "-3") - } - var value = CarbonProperties.getInstance().getProperty( - CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT) - assertResult(null)(value) - - intercept[NumberFormatException] { - CarbonProperties.getInstance().addProperty( - CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT, "3s") - } - value = CarbonProperties.getInstance().getProperty( - CarbonCommonConstants.CARBON_SEARCH_MODE_WORKER_WORKLOAD_LIMIT) - assertResult(null)(value) - } -} - -class DummyRef extends QueryService { - override def query(request: QueryRequest): QueryResponse = ??? - - override def shutdown(request: ShutdownRequest): ShutdownResponse = ??? - - override def getProtocolVersion(protocol: String, - clientVersion: Long): Long = ??? - - override def getProtocolSignature(protocol: String, - clientVersion: Long, - clientMethodsHash: Int): ProtocolSignature = ??? -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/horizon/pom.xml ---------------------------------------------------------------------- diff --git a/store/horizon/pom.xml b/store/horizon/pom.xml new file mode 100644 index 0000000..3665e53 --- /dev/null +++ b/store/horizon/pom.xml @@ -0,0 +1,95 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.carbondata</groupId> + <artifactId>carbondata-parent</artifactId> + <version>1.5.0-SNAPSHOT</version> + <relativePath>../../pom.xml</relativePath> + </parent> + + <artifactId>carbondata-horizon</artifactId> + <name>Apache CarbonData :: Horizon </name> + + <properties> + <dev.path>${basedir}/../../dev</dev.path> + <spring.version>2.0.2.RELEASE</spring.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.carbondata</groupId> + <artifactId>carbondata-store-core</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>om.google.code.gson</groupId> + <artifactId>gson</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-starter-web</artifactId> + <version>${spring.version}</version> + <exclusions> + <exclusion> + <groupId>ch.qos.logback</groupId> + <artifactId>*</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.antlr</groupId> + <artifactId>antlr4-runtime</artifactId> + <version>4.7.1</version> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.scalatest</groupId> + <artifactId>scalatest_${scala.binary.version}</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-starter-test</artifactId> + <version>${spring.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.google.code.gson</groupId> + <artifactId>gson</artifactId> + <version>2.8.2</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <version>2.9.5</version> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>1.7</source> + <target>1.7</target> + </configuration> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/horizon/src/main/anltr/Expression.g4 ---------------------------------------------------------------------- diff --git a/store/horizon/src/main/anltr/Expression.g4 b/store/horizon/src/main/anltr/Expression.g4 new file mode 100644 index 0000000..81688cd --- /dev/null +++ b/store/horizon/src/main/anltr/Expression.g4 @@ -0,0 +1,163 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * copy from SqlBase.g4 of Presto and Spark. + */ + +grammar Expression; + +parseFilter + : booleanExpression EOF + ; + +booleanExpression + : predicate + | left=booleanExpression operator=AND right=booleanExpression + | left=booleanExpression operator=OR right=booleanExpression + | '(' booleanExpression ')' + ; + +predicate + : left=primaryExpression comparisonOperator right=primaryExpression + | left=primaryExpression NOT? BETWEEN lower=primaryExpression AND upper=primaryExpression + | left=primaryExpression NOT? IN '(' primaryExpression (',' primaryExpression)* ')' + | left=primaryExpression IS NOT? NULL + ; + +primaryExpression + : constant #constantDefault + | identifier #columnReference + | base=identifier '.' fieldName=identifier #dereference + | '(' booleanExpression ')' #parenthesizedExpression + ; + +constant + : NULL #nullLiteral + | number #numericLiteral + | booleanValue #booleanLiteral + | STRING+ #stringLiteral + ; + +identifier + : IDENTIFIER #unquotedIdentifier + | BACKQUOTED_IDENTIFIER #backQuotedIdentifier + ; + +comparisonOperator + : EQ | NEQ | LT | LTE | GT | GTE + ; + +booleanValue + : TRUE | FALSE + ; + +number + : MINUS? DECIMAL_VALUE #decimalLiteral + | MINUS? INTEGER_VALUE #integerLiteral + | MINUS? BIGINT_LITERAL #bigIntLiteral + | MINUS? SMALLINT_LITERAL #smallIntLiteral + | MINUS? TINYINT_LITERAL #tinyIntLiteral + | MINUS? DOUBLE_LITERAL #doubleLiteral + | MINUS? BIGDECIMAL_LITERAL #bigDecimalLiteral + ; + +AND: 'AND'; +BETWEEN: 'BETWEEN'; +FALSE: 'FALSE'; +IN: 'IN'; +IS: 'IS'; +NOT: 'NOT'; +NULL: 'NULL'; +OR: 'OR'; +TRUE: 'TRUE'; + +EQ : '='; +NEQ : '<>' | '!='; +LT : '<'; +LTE : '<='; +GT : '>'; +GTE : '>='; + +MINUS: '-'; + +STRING + : '\'' ( ~('\''|'\\') | ('\\' .) )* '\'' + | '"' ( ~('"'|'\\') | ('\\' .) )* '"' + ; + +BIGINT_LITERAL + : DIGIT+ 'L' + ; + +SMALLINT_LITERAL + : DIGIT+ 'S' + ; + +TINYINT_LITERAL + : DIGIT+ 'Y' + ; + +INTEGER_VALUE + : DIGIT+ + ; + +DECIMAL_VALUE + : DIGIT+ EXPONENT + | DECIMAL_DIGITS EXPONENT? + ; + +DOUBLE_LITERAL + : DIGIT+ EXPONENT? 'D' + | DECIMAL_DIGITS EXPONENT? 'D' + ; + +BIGDECIMAL_LITERAL + : DIGIT+ EXPONENT? 'BD' + | DECIMAL_DIGITS EXPONENT? 'BD' + ; + +IDENTIFIER + : (LETTER | DIGIT | '_')+ + ; + +BACKQUOTED_IDENTIFIER + : '`' ( ~'`' | '``' )* '`' + ; + +fragment DECIMAL_DIGITS + : DIGIT+ '.' DIGIT* + | '.' DIGIT+ + ; + +fragment EXPONENT + : 'E' [+-]? DIGIT+ + ; + +fragment DIGIT + : [0-9] + ; + +fragment LETTER + : [A-Z] + ; + +WS + : [ \r\n\t]+ -> channel(HIDDEN) + ; + +// Catch-all for anything we can't recognize. +// We use this to be able to ignore and recover all the text +// when splitting statements with DelimiterLexer +UNRECOGNIZED + : . + ; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/horizon/src/main/java/org/apache/carbondata/horizon/antlr/ANTLRNoCaseStringStream.java ---------------------------------------------------------------------- diff --git a/store/horizon/src/main/java/org/apache/carbondata/horizon/antlr/ANTLRNoCaseStringStream.java b/store/horizon/src/main/java/org/apache/carbondata/horizon/antlr/ANTLRNoCaseStringStream.java new file mode 100644 index 0000000..1032c51 --- /dev/null +++ b/store/horizon/src/main/java/org/apache/carbondata/horizon/antlr/ANTLRNoCaseStringStream.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.horizon.antlr; + +import org.antlr.v4.runtime.ANTLRInputStream; +import org.antlr.v4.runtime.IntStream; + +public class ANTLRNoCaseStringStream extends ANTLRInputStream { + + public ANTLRNoCaseStringStream(String input) { + super(input); + } + + @Override public int LA(int i) { + int la = super.LA(i); + if (la == 0 || la == IntStream.EOF) { + return la; + } else { + return Character.toUpperCase(la); + } + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/horizon/src/main/java/org/apache/carbondata/horizon/antlr/FilterVisitor.java ---------------------------------------------------------------------- diff --git a/store/horizon/src/main/java/org/apache/carbondata/horizon/antlr/FilterVisitor.java b/store/horizon/src/main/java/org/apache/carbondata/horizon/antlr/FilterVisitor.java new file mode 100644 index 0000000..0f28ea9 --- /dev/null +++ b/store/horizon/src/main/java/org/apache/carbondata/horizon/antlr/FilterVisitor.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.horizon.antlr; + +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.List; + +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.metadata.schema.table.CarbonTable; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; +import org.apache.carbondata.core.scan.expression.ColumnExpression; +import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.core.scan.expression.LiteralExpression; +import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression; +import org.apache.carbondata.core.scan.expression.conditional.GreaterThanEqualToExpression; +import org.apache.carbondata.core.scan.expression.conditional.GreaterThanExpression; +import org.apache.carbondata.core.scan.expression.conditional.InExpression; +import org.apache.carbondata.core.scan.expression.conditional.LessThanEqualToExpression; +import org.apache.carbondata.core.scan.expression.conditional.LessThanExpression; +import org.apache.carbondata.core.scan.expression.conditional.ListExpression; +import org.apache.carbondata.core.scan.expression.conditional.NotEqualsExpression; +import org.apache.carbondata.core.scan.expression.conditional.NotInExpression; +import org.apache.carbondata.core.scan.expression.logical.AndExpression; +import org.apache.carbondata.core.scan.expression.logical.OrExpression; +import org.apache.carbondata.core.scan.expression.logical.RangeExpression; +import org.apache.carbondata.horizon.antlr.gen.ExpressionBaseVisitor; +import org.apache.carbondata.horizon.antlr.gen.ExpressionParser; + +public class FilterVisitor extends ExpressionBaseVisitor<Expression> { + + private CarbonTable carbonTable; + + public FilterVisitor(CarbonTable carbonTable) { + this.carbonTable = carbonTable; + } + + public ColumnExpression getColumnExpression(String columnName) { + return getColumnExpression(carbonTable.getTableName(), columnName); + } + + public ColumnExpression getColumnExpression(String tableName, String columnName) { + CarbonColumn column = carbonTable.getColumnByName(tableName, columnName); + if (column == null) { + throw new RuntimeException("column not exists: " + tableName + "." + columnName); + } + return new ColumnExpression(column.getColName(), column.getDataType()); + } + + @Override + public Expression visitParseFilter(ExpressionParser.ParseFilterContext ctx) { + return visitBooleanExpression(ctx.booleanExpression()); + } + + @Override + public Expression visitBooleanExpression(ExpressionParser.BooleanExpressionContext ctx) { + if (ctx.AND() != null) { + return new AndExpression(visitBooleanExpression(ctx.left), visitBooleanExpression(ctx.right)); + } else if (ctx.OR() != null) { + return new OrExpression(visitBooleanExpression(ctx.left), visitBooleanExpression(ctx.right)); + } else if (ctx.predicate() != null) { + return visitPredicate(ctx.predicate()); + } else if (!ctx.booleanExpression().isEmpty()) { + return visitBooleanExpression(ctx.booleanExpression().get(0)); + } + + return super.visitBooleanExpression(ctx); + } + + @Override + public Expression visitPredicate(ExpressionParser.PredicateContext ctx) { + ExpressionParser.ComparisonOperatorContext comparision = ctx.comparisonOperator(); + if (comparision != null) { + if (comparision.EQ() != null) { + return new EqualToExpression(visit(ctx.left), visit(ctx.right)); + } else if (comparision.GT() != null) { + return new GreaterThanExpression(visit(ctx.left), visit(ctx.right)); + } else if (comparision.GTE() != null) { + return new GreaterThanEqualToExpression(visit(ctx.left), visit(ctx.right)); + } else if (comparision.LT() != null) { + return new LessThanExpression(visit(ctx.left), visit(ctx.right)); + } else if (comparision.LTE() != null) { + return new LessThanEqualToExpression(visit(ctx.left), visit(ctx.right)); + } else if (comparision.NEQ() != null) { + return new NotEqualsExpression(visit(ctx.left), visit(ctx.right)); + } + } else if (ctx.BETWEEN() != null) { + if (ctx.NOT() != null) { + return new RangeExpression(new GreaterThanExpression(visit(ctx.left), visit(ctx.upper)), + new LessThanExpression(visit(ctx.left), visit(ctx.lower))); + } else { + return new RangeExpression( + new GreaterThanEqualToExpression(visit(ctx.left), visit(ctx.lower)), + new LessThanEqualToExpression(visit(ctx.left), visit(ctx.upper))); + } + } else if (ctx.IN() != null) { + List<Expression> listExpression = new ArrayList<Expression>(); + List<ExpressionParser.PrimaryExpressionContext> primaryExpressionContexts = + ctx.primaryExpression(); + for (ExpressionParser.PrimaryExpressionContext primary : primaryExpressionContexts) { + if (ctx.left != primary) { + listExpression.add(visit(primary)); + } + } + if (ctx.NOT() != null) { + return new NotInExpression(visit(ctx.left), new ListExpression(listExpression)); + } else { + return new InExpression(visit(ctx.left), new ListExpression(listExpression)); + } + } else if (ctx.NULL() != null) { + if (ctx.NOT() == null) { + return new EqualToExpression( + visit(ctx.left), new LiteralExpression(null, DataTypes.STRING), true); + } else { + return new NotEqualsExpression(visit(ctx.left), + new LiteralExpression(null, DataTypes.STRING), true); + } + } + return super.visitPredicate(ctx); + } + + @Override + public Expression visitNullLiteral(ExpressionParser.NullLiteralContext ctx) { + return null; + } + + @Override + public Expression visitDecimalLiteral(ExpressionParser.DecimalLiteralContext ctx) { + return new LiteralExpression(new BigDecimal(ctx.getText()), + DataTypes.createDefaultDecimalType()); + } + + @Override + public Expression visitIntegerLiteral(ExpressionParser.IntegerLiteralContext ctx) { + return new LiteralExpression(Integer.parseInt(ctx.getText()), DataTypes.INT); + } + + @Override + public Expression visitBigIntLiteral(ExpressionParser.BigIntLiteralContext ctx) { + return new LiteralExpression(Long.parseLong(ctx.getText()), DataTypes.LONG); + } + + @Override + public Expression visitSmallIntLiteral(ExpressionParser.SmallIntLiteralContext ctx) { + return new LiteralExpression(Short.parseShort(ctx.getText()), DataTypes.SHORT); + } + + @Override + public Expression visitTinyIntLiteral(ExpressionParser.TinyIntLiteralContext ctx) { + return new LiteralExpression(Short.parseShort(ctx.getText()), DataTypes.SHORT); + } + + @Override + public Expression visitDoubleLiteral(ExpressionParser.DoubleLiteralContext ctx) { + return new LiteralExpression(Double.parseDouble(ctx.getText()), DataTypes.DOUBLE); + } + + @Override + public Expression visitBigDecimalLiteral(ExpressionParser.BigDecimalLiteralContext ctx) { + return new LiteralExpression(new BigDecimal(ctx.getText()), + DataTypes.createDefaultDecimalType()); + } + + @Override + public Expression visitBooleanLiteral(ExpressionParser.BooleanLiteralContext ctx) { + ExpressionParser.BooleanValueContext booleanValueContext = ctx.booleanValue(); + if (booleanValueContext.FALSE() != null) { + return new LiteralExpression(false, DataTypes.BOOLEAN); + } else { + return new LiteralExpression(true, DataTypes.BOOLEAN); + } + } + + @Override + public Expression visitStringLiteral(ExpressionParser.StringLiteralContext ctx) { + return new LiteralExpression(ctx.getText(), DataTypes.STRING); + } + + @Override + public Expression visitUnquotedIdentifier(ExpressionParser.UnquotedIdentifierContext ctx) { + return getColumnExpression(ctx.getText()); + } + + private String identifier(String identifier) { + return identifier.replace("", "`"); + } + + @Override + public Expression visitBackQuotedIdentifier(ExpressionParser.BackQuotedIdentifierContext ctx) { + return getColumnExpression(identifier(ctx.getText())); + } + + @Override + public Expression visitDereference(ExpressionParser.DereferenceContext ctx) { + String tableName = identifier(ctx.base.getText()); + String columnName = identifier(ctx.fieldName.getText()); + return getColumnExpression(tableName, columnName); + } +} + http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/horizon/src/main/java/org/apache/carbondata/horizon/antlr/gen/Expression.tokens ---------------------------------------------------------------------- diff --git a/store/horizon/src/main/java/org/apache/carbondata/horizon/antlr/gen/Expression.tokens b/store/horizon/src/main/java/org/apache/carbondata/horizon/antlr/gen/Expression.tokens new file mode 100644 index 0000000..0ba3c59 --- /dev/null +++ b/store/horizon/src/main/java/org/apache/carbondata/horizon/antlr/gen/Expression.tokens @@ -0,0 +1,51 @@ +T__0=1 +T__1=2 +T__2=3 +T__3=4 +AND=5 +BETWEEN=6 +FALSE=7 +IN=8 +IS=9 +NOT=10 +NULL=11 +OR=12 +TRUE=13 +EQ=14 +NEQ=15 +LT=16 +LTE=17 +GT=18 +GTE=19 +MINUS=20 +STRING=21 +BIGINT_LITERAL=22 +SMALLINT_LITERAL=23 +TINYINT_LITERAL=24 +INTEGER_VALUE=25 +DECIMAL_VALUE=26 +DOUBLE_LITERAL=27 +BIGDECIMAL_LITERAL=28 +IDENTIFIER=29 +BACKQUOTED_IDENTIFIER=30 +WS=31 +UNRECOGNIZED=32 +'('=1 +')'=2 +','=3 +'.'=4 +'AND'=5 +'BETWEEN'=6 +'FALSE'=7 +'IN'=8 +'IS'=9 +'NOT'=10 +'NULL'=11 +'OR'=12 +'TRUE'=13 +'='=14 +'<'=16 +'<='=17 +'>'=18 +'>='=19 +'-'=20 http://git-wip-us.apache.org/repos/asf/carbondata/blob/4437920a/store/horizon/src/main/java/org/apache/carbondata/horizon/antlr/gen/ExpressionBaseVisitor.java ---------------------------------------------------------------------- diff --git a/store/horizon/src/main/java/org/apache/carbondata/horizon/antlr/gen/ExpressionBaseVisitor.java b/store/horizon/src/main/java/org/apache/carbondata/horizon/antlr/gen/ExpressionBaseVisitor.java new file mode 100644 index 0000000..42ef9c8 --- /dev/null +++ b/store/horizon/src/main/java/org/apache/carbondata/horizon/antlr/gen/ExpressionBaseVisitor.java @@ -0,0 +1,168 @@ +// Generated from /home/david/Documents/code/carbondata/store/horizon/src/main/anltr/Expression.g4 by ANTLR 4.7 +package org.apache.carbondata.horizon.antlr.gen; +import org.antlr.v4.runtime.tree.AbstractParseTreeVisitor; + +/** + * This class provides an empty implementation of {@link ExpressionVisitor}, + * which can be extended to create a visitor which only needs to handle a subset + * of the available methods. + * + * @param <T> The return type of the visit operation. Use {@link Void} for + * operations with no return type. + */ +public class ExpressionBaseVisitor<T> extends AbstractParseTreeVisitor<T> implements ExpressionVisitor<T> { + /** + * {@inheritDoc} + * + * <p>The default implementation returns the result of calling + * {@link #visitChildren} on {@code ctx}.</p> + */ + @Override public T visitParseFilter(ExpressionParser.ParseFilterContext ctx) { return visitChildren(ctx); } + /** + * {@inheritDoc} + * + * <p>The default implementation returns the result of calling + * {@link #visitChildren} on {@code ctx}.</p> + */ + @Override public T visitBooleanExpression(ExpressionParser.BooleanExpressionContext ctx) { return visitChildren(ctx); } + /** + * {@inheritDoc} + * + * <p>The default implementation returns the result of calling + * {@link #visitChildren} on {@code ctx}.</p> + */ + @Override public T visitPredicate(ExpressionParser.PredicateContext ctx) { return visitChildren(ctx); } + /** + * {@inheritDoc} + * + * <p>The default implementation returns the result of calling + * {@link #visitChildren} on {@code ctx}.</p> + */ + @Override public T visitConstantDefault(ExpressionParser.ConstantDefaultContext ctx) { return visitChildren(ctx); } + /** + * {@inheritDoc} + * + * <p>The default implementation returns the result of calling + * {@link #visitChildren} on {@code ctx}.</p> + */ + @Override public T visitColumnReference(ExpressionParser.ColumnReferenceContext ctx) { return visitChildren(ctx); } + /** + * {@inheritDoc} + * + * <p>The default implementation returns the result of calling + * {@link #visitChildren} on {@code ctx}.</p> + */ + @Override public T visitDereference(ExpressionParser.DereferenceContext ctx) { return visitChildren(ctx); } + /** + * {@inheritDoc} + * + * <p>The default implementation returns the result of calling + * {@link #visitChildren} on {@code ctx}.</p> + */ + @Override public T visitParenthesizedExpression(ExpressionParser.ParenthesizedExpressionContext ctx) { return visitChildren(ctx); } + /** + * {@inheritDoc} + * + * <p>The default implementation returns the result of calling + * {@link #visitChildren} on {@code ctx}.</p> + */ + @Override public T visitNullLiteral(ExpressionParser.NullLiteralContext ctx) { return visitChildren(ctx); } + /** + * {@inheritDoc} + * + * <p>The default implementation returns the result of calling + * {@link #visitChildren} on {@code ctx}.</p> + */ + @Override public T visitNumericLiteral(ExpressionParser.NumericLiteralContext ctx) { return visitChildren(ctx); } + /** + * {@inheritDoc} + * + * <p>The default implementation returns the result of calling + * {@link #visitChildren} on {@code ctx}.</p> + */ + @Override public T visitBooleanLiteral(ExpressionParser.BooleanLiteralContext ctx) { return visitChildren(ctx); } + /** + * {@inheritDoc} + * + * <p>The default implementation returns the result of calling + * {@link #visitChildren} on {@code ctx}.</p> + */ + @Override public T visitStringLiteral(ExpressionParser.StringLiteralContext ctx) { return visitChildren(ctx); } + /** + * {@inheritDoc} + * + * <p>The default implementation returns the result of calling + * {@link #visitChildren} on {@code ctx}.</p> + */ + @Override public T visitUnquotedIdentifier(ExpressionParser.UnquotedIdentifierContext ctx) { return visitChildren(ctx); } + /** + * {@inheritDoc} + * + * <p>The default implementation returns the result of calling + * {@link #visitChildren} on {@code ctx}.</p> + */ + @Override public T visitBackQuotedIdentifier(ExpressionParser.BackQuotedIdentifierContext ctx) { return visitChildren(ctx); } + /** + * {@inheritDoc} + * + * <p>The default implementation returns the result of calling + * {@link #visitChildren} on {@code ctx}.</p> + */ + @Override public T visitComparisonOperator(ExpressionParser.ComparisonOperatorContext ctx) { return visitChildren(ctx); } + /** + * {@inheritDoc} + * + * <p>The default implementation returns the result of calling + * {@link #visitChildren} on {@code ctx}.</p> + */ + @Override public T visitBooleanValue(ExpressionParser.BooleanValueContext ctx) { return visitChildren(ctx); } + /** + * {@inheritDoc} + * + * <p>The default implementation returns the result of calling + * {@link #visitChildren} on {@code ctx}.</p> + */ + @Override public T visitDecimalLiteral(ExpressionParser.DecimalLiteralContext ctx) { return visitChildren(ctx); } + /** + * {@inheritDoc} + * + * <p>The default implementation returns the result of calling + * {@link #visitChildren} on {@code ctx}.</p> + */ + @Override public T visitIntegerLiteral(ExpressionParser.IntegerLiteralContext ctx) { return visitChildren(ctx); } + /** + * {@inheritDoc} + * + * <p>The default implementation returns the result of calling + * {@link #visitChildren} on {@code ctx}.</p> + */ + @Override public T visitBigIntLiteral(ExpressionParser.BigIntLiteralContext ctx) { return visitChildren(ctx); } + /** + * {@inheritDoc} + * + * <p>The default implementation returns the result of calling + * {@link #visitChildren} on {@code ctx}.</p> + */ + @Override public T visitSmallIntLiteral(ExpressionParser.SmallIntLiteralContext ctx) { return visitChildren(ctx); } + /** + * {@inheritDoc} + * + * <p>The default implementation returns the result of calling + * {@link #visitChildren} on {@code ctx}.</p> + */ + @Override public T visitTinyIntLiteral(ExpressionParser.TinyIntLiteralContext ctx) { return visitChildren(ctx); } + /** + * {@inheritDoc} + * + * <p>The default implementation returns the result of calling + * {@link #visitChildren} on {@code ctx}.</p> + */ + @Override public T visitDoubleLiteral(ExpressionParser.DoubleLiteralContext ctx) { return visitChildren(ctx); } + /** + * {@inheritDoc} + * + * <p>The default implementation returns the result of calling + * {@link #visitChildren} on {@code ctx}.</p> + */ + @Override public T visitBigDecimalLiteral(ExpressionParser.BigDecimalLiteralContext ctx) { return visitChildren(ctx); } +} \ No newline at end of file