Github user denalex commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/1344#discussion_r171055669
  
    --- Diff: 
pxf/pxf-ignite/src/main/java/org/apache/hawq/pxf/plugins/ignite/IgniteAccessor.java
 ---
    @@ -0,0 +1,502 @@
    +package org.apache.hawq.pxf.plugins.ignite;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +import org.apache.hawq.pxf.api.OneRow;
    +import org.apache.hawq.pxf.api.ReadAccessor;
    +import org.apache.hawq.pxf.api.WriteAccessor;
    +import org.apache.hawq.pxf.api.UserDataException;
    +import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
    +import org.apache.hawq.pxf.api.utilities.InputData;
    +import org.apache.hawq.pxf.plugins.ignite.IgnitePlugin;
    +
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.LinkedList;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +import java.io.InputStreamReader;
    +import java.io.BufferedReader;
    +import java.io.IOException;
    +import java.io.UnsupportedEncodingException;
    +import java.net.URL;
    +import java.net.URLEncoder;
    +import java.net.MalformedURLException;
    +import java.net.ProtocolException;
    +
    +import org.apache.commons.logging.Log;
    +import org.apache.commons.logging.LogFactory;
    +
    +import com.google.gson.JsonParser;
    +import com.google.gson.JsonElement;
    +import com.google.gson.JsonArray;
    +
    +
    +/**
    + * Ignite database read and write accessor
    + */
    +public class IgniteAccessor extends IgnitePlugin implements ReadAccessor, 
WriteAccessor {
    +    private static final Log LOG = LogFactory.getLog(IgniteAccessor.class);
    +
    +    // Prepared URLs to send to Ignite when reading data
    +    private String urlReadStart = null;
    +    private String urlReadFetch = null;
    +    private String urlReadClose = null;
    +    // Set to true when Ignite reported all the data for the SELECT query 
was retreived
    +    private boolean isLastReadFinished = false;
    +    // A buffer to store the SELECT query results (without Ignite metadata)
    +    private LinkedList<JsonArray> bufferRead = new LinkedList<JsonArray>();
    +
    +    // A template for the INSERT 
    +    private String queryWrite = null;
    +    // Set to true when the INSERT operation is in progress
    +    private boolean isWriteActive = false;
    +    // A buffer to store prepared values for the INSERT query
    +    private LinkedList<OneRow> bufferWrite = new LinkedList<OneRow>();
    +    
    +
    +    /**
    +     * Class constructor.
    +     */
    +    public IgniteAccessor(InputData inputData) throws UserDataException {
    +        super(inputData);
    +    }
    +
    +    /**
    +     * openForRead() implementation
    +     */
    +    @Override
    +    public boolean openForRead() throws Exception {
    +        if (bufferSize == 0) {
    +            bufferSize = 1;
    +        }
    +
    +        StringBuilder sb = new StringBuilder();
    +
    +        // Insert a list of fields to be selected
    +        ArrayList<ColumnDescriptor> columns = 
inputData.getTupleDescription();
    +        if (columns == null) {
    +            throw new UserDataException("Tuple description must be 
present.");
    +        }
    +        sb.append("SELECT ");
    +        for (int i = 0; i < columns.size(); i++) {
    +            ColumnDescriptor column = columns.get(i);
    +            if (i > 0) {
    +                sb.append(",");
    +            }
    +            sb.append(column.columnName());
    +        }
    +
    +        // Insert the name of the table to select values from
    +        sb.append(" FROM ");
    +        String tableName = inputData.getDataSource();
    +        if (tableName == null) {
    +            throw new UserDataException("Table name must be set as 
DataSource.");
    +        }
    +        sb.append(tableName);
    +
    +        // Insert query constraints
    +        // Note: Filter constants may be provided separately from the 
query itself, mostly for the safety of the SQL queries; at the moment, however, 
they are passed in the query itself.
    +        ArrayList<String> filterConstants = null;
    +        if (inputData.hasFilter()) {
    +            WhereSQLBuilder filterBuilder = new WhereSQLBuilder(inputData);
    +            String whereSql = filterBuilder.buildWhereSQL();
    +
    +            if (whereSql != null) {
    +                sb.append(" WHERE ").append(whereSql);
    +            }
    +        }
    +
    +        // Insert partition constaints
    +        sb = new 
IgnitePartitionFragmenter(inputData).buildFragmenterSql(sb);
    +
    +        // Format URL
    +        urlReadStart = buildQueryFldexe(sb.toString(), filterConstants);
    +
    +        // Send the first REST request that opens the connection
    +        JsonElement response = sendRestRequest(urlReadStart);
    +
    +        // Build 'urlReadFetch' and 'urlReadClose'
    +        isLastReadFinished = 
response.getAsJsonObject().get("last").getAsBoolean();
    +        urlReadFetch = 
buildQueryFetch(response.getAsJsonObject().get("queryId").getAsInt());
    +        urlReadClose = 
buildQueryCls(response.getAsJsonObject().get("queryId").getAsInt());
    +
    +        LOG.info("Ignite read request. URL: '" + urlReadStart + "'");
    +        return true;
    +    }
    +
    +    /**
    +     * readNextObject() implementation
    +     */
    +    @Override
    +    public OneRow readNextObject() throws Exception {
    +        if (urlReadFetch == null) {
    +            LOG.error("readNextObject(): urlReadFetch is null. This means 
the Ignite qryfldexe query was not executed properly");
    +            throw new ProtocolException("readNextObject(): urlReadFetch is 
null. This means the Ignite qryfldexe query was not executed properly");
    +        }
    +
    +        if (bufferRead.isEmpty()) {
    +            // Refill buffer
    +            if (isLastReadFinished) {
    +                if (LOG.isDebugEnabled()) {
    +                    LOG.debug("readNextObject(): All the data received 
from Ignite");
    +                }
    +                return null;
    +            }
    +
    +            JsonElement response = sendRestRequest(urlReadFetch);
    +            isLastReadFinished = 
response.getAsJsonObject().get("last").getAsBoolean();
    +
    +            // Parse 'items'
    +            Iterator<JsonElement> itemsIterator = 
response.getAsJsonObject().get("items").getAsJsonArray().iterator();
    +            while (itemsIterator.hasNext()) {
    +                if 
(!bufferRead.add(itemsIterator.next().getAsJsonArray())) {
    +                    LOG.error("readNextObject(): Buffer refill failed (not 
enough memory in 'bufferRead')");
    +                    throw new OutOfMemoryError("readNextObject(): not 
enough memory in 'bufferRead'");
    +                }
    +            }
    +
    +            // Check again in case "response" contains no elements
    +            if (bufferRead.isEmpty()) {
    +                if (LOG.isDebugEnabled()) {
    +                    LOG.debug("readNextObject(): Buffer refill failed");
    +                    LOG.debug("readNextObject(): All the data received 
from Ignite");
    +                }
    +                return null;
    +            }
    +        }
    +
    +        return new OneRow(bufferRead.pollFirst());
    +    }
    +
    +    /**
    +     * closeForRead() implementation
    +     */
    +    @Override
    +    public void closeForRead() {
    +        if (urlReadClose != null) {
    +            try {
    +                sendRestRequest(urlReadClose);
    +            }
    +            catch (Exception e) {
    +                if (LOG.isDebugEnabled()) {
    +                    LOG.debug("closeForRead() Exception: " + 
e.getClass().getSimpleName());
    +                }
    +            }
    +        }
    +        isLastReadFinished = false;
    +
    +        if (LOG.isDebugEnabled()) {
    +            LOG.debug("Ignite read request finished. URL: '" + 
urlReadClose + "'");
    +        }
    +    }
    +
    +    /**
    +     * openForWrite() implementation.
    +     * No queries are sent to Ignite by this procedure, so if there are 
some problems (for example, with connection), they will be revealed only during 
the execution of 'writeNextObject()'
    +     */
    +    @Override
    +    public boolean openForWrite() throws UserDataException {
    +        // This is a temporary solution. At the moment there is no other 
way (except for the usage of user-defined parameters) to get the correct name 
of Ignite table: GPDB inserts extra data into the address, as required by 
Hadoop.
    +        // Note that if no extra data is present, the 'definedSource' will 
be left unchanged
    +        String definedSource = inputData.getDataSource();
    +        Pattern pattern = Pattern.compile("/(.*)/[0-9]*-[0-9]*_[0-9]*");
    +        Matcher matcher = pattern.matcher(definedSource);
    +        if (matcher.find()) {
    +            inputData.setDataSource(matcher.group(1));
    +        }
    +
    +        StringBuilder sb = new StringBuilder();
    +        sb.append("INSERT INTO ");
    +
    +        // Insert the table name
    +        String tableName = inputData.getDataSource();
    +        if (tableName == null) {
    +            throw new UserDataException("Table name must be set as 
DataSource.");
    +        }
    +        sb.append(tableName);
    +
    +        // Insert the column names
    +        sb.append("(");
    +        ArrayList<ColumnDescriptor> columns = 
inputData.getTupleDescription();
    +        if (columns == null) {
    +            throw new UserDataException("Tuple description must be 
present.");
    +        }
    +        String fieldDivisor = "";
    +        for (int i = 0; i < columns.size(); i++) {
    +            sb.append(fieldDivisor);
    +            fieldDivisor = ", ";
    +            sb.append(columns.get(i).columnName());
    +        }
    +        sb.append(")");
    +
    +        sb.append(" VALUES ");
    +
    +        queryWrite = sb.toString();
    +        return true;
    +    }
    +
    +    /**
    +     * writeNextObject() implementation
    +     */
    +    @Override
    +    public boolean writeNextObject(OneRow currentRow) throws Exception {
    +        boolean currentRowInBuffer = bufferWrite.add(currentRow);
    +
    +        if (!isWriteActive) {
    +            if (!currentRowInBuffer) {
    +                LOG.error("writeNextObject(): Failed (not enough memory in 
'bufferWrite')");
    +                throw new OutOfMemoryError("writeNextObject(): not enough 
memory in 'bufferWrite'");
    +            }
    +            LOG.info("Ignite write request. Query: '" + queryWrite + "'");
    +            sendInsertRestRequest(queryWrite);
    +            bufferWrite.removeFirst();
    +            isWriteActive = true;
    +            return true;
    +        }
    +
    +        if ((bufferWrite.size() >= bufferSize) || (!currentRowInBuffer)) {
    +            sendInsertRestRequest(queryWrite);
    +        }
    +
    +        if (!currentRowInBuffer) {
    +            if (!bufferWrite.add(currentRow)) {
    +                LOG.error("writeNextObject(): Failed (not enough memory in 
'bufferSend')");
    +                throw new OutOfMemoryError("writeNextObject(): not enough 
memory in 'bufferSend'");
    +            }
    +        }
    +        
    +        return true;
    +    }
    +
    +    /**
    +     * closeForWrite() implementation
    +     */
    +    @Override
    +    public void closeForWrite() throws Exception {
    +        if (!bufferWrite.isEmpty()) {
    +            sendInsertRestRequest(queryWrite);
    +        }
    +        if (isWriteActive) {
    +            // At this point, the request must have finished successfully; 
otherwise an exception would be thrown before
    +            LOG.info("Ignite write request finished successfully. Query: 
'" + queryWrite + "'");
    +        }
    +        isWriteActive = false;
    +    }
    +
    +    /**
    +     * Build HTTP GET query for Ignite REST API with command 'qryfldexe'
    +     * 
    +     * @param querySql SQL query with filter constants. The constants must 
replaced by "?" if 'filterConstants' is not given
    +     * @param filterConstants
    +     * 
    +     * @return Prepared HTTP query. The query will be properly encoded 
with {@link java.net.URLEncoder}
    +     * 
    +     * @throws UnsupportedEncodingException from {@link 
java.net.URLEncoder.encode()}
    +     */
    +    private String buildQueryFldexe(String querySql, ArrayList<String> 
filterConstants) throws UnsupportedEncodingException {
    +        StringBuilder sb = new StringBuilder();
    +        sb.append("http://";);
    +        sb.append(igniteHost);
    +        sb.append("/ignite");
    +        sb.append("?");
    +        sb.append("cmd=qryfldexe");
    +        sb.append("&");
    +        sb.append("pageSize=0");
    +        sb.append("&");
    +        if (cacheName != null) {
    +            sb.append("cacheName=");
    +            // Note that Ignite supports only "good" cache names (those 
that should be left unchanged by the URLEncoder.encode())
    +            sb.append(URLEncoder.encode(cacheName, "UTF-8"));
    +            sb.append("&");
    +        }
    +        int counter = 1;
    +        if (filterConstants != null) {
    +            for (String constant : filterConstants) {
    +                sb.append("arg");
    +                sb.append(counter);
    +                sb.append("=");
    +                sb.append(URLEncoder.encode(constant, "UTF-8"));
    +                sb.append("&");
    +                counter += 1;
    +            }
    +        }
    +        sb.append("qry=");
    +        sb.append(URLEncoder.encode(querySql, "UTF-8"));
    +
    +        return sb.toString();
    +    }
    +
    +    /**
    +     * Build HTTP GET query for Ignite REST API with command 'qryfetch'
    +     * This query is used to retrieve data after the 'qryfldexe' command 
started
    +     * 
    +     * @param queryId ID of the query assigned by Ignite when the query 
started
    +     *
    +     * @return Prepared HTTP query
    +     */
    +    private String buildQueryFetch(int queryId) {
    +        StringBuilder sb = new StringBuilder();
    +        sb.append("http://";);
    +        sb.append(igniteHost);
    +        sb.append("/ignite");
    +        sb.append("?");
    +        sb.append("cmd=qryfetch");
    +        sb.append("&");
    +        sb.append("pageSize=");
    +        sb.append(bufferSize);
    +        sb.append("&");
    +        sb.append("qryId=");
    +        sb.append(queryId);
    +
    +        return sb.toString();
    +    }
    +
    +    /**
    +     * Build HTTP GET query for Ignite REST API with command 'qrycls'
    +     * This query is used to close query resources on Ignite side
    +     * 
    +     * @param queryId ID of the query assigned by Ignite when the query 
started
    +     *
    +     * @return Prepared HTTP query
    +     */
    +    private String buildQueryCls(int queryId) {
    +        StringBuilder sb = new StringBuilder();
    +        sb.append("http://";);
    +        sb.append(igniteHost);
    +        sb.append("/ignite");
    +        sb.append("?");
    +        sb.append("cmd=qrycls");
    +        sb.append("&");
    +        sb.append("qryId=");
    +        sb.append(queryId);
    +
    +        return sb.toString();
    +    }
    +
    +    /**
    +     * Send a REST request to the Ignite server
    +     * 
    +     * @param query A prepared and properly encoded HTTP GET request
    +     * 
    +     * @return "response" field from the received JSON object
    +     * (See Ignite REST API documentation for details)
    +     * 
    +     * @throws ProtocolException if Ignite reports error in it's JSON 
response
    +     * @throws MalformedURLException if URL is malformed
    +     * @throws IOException in case of connection failure
    +     */
    +    private JsonElement sendRestRequest(String query) throws 
ProtocolException, MalformedURLException, IOException {
    +        // Create URL object
    +        URL url;
    +        try {
    +            url = new URL(query);
    +        }
    +        catch (MalformedURLException e) {
    +            LOG.error("sendRestRequest(): Failed (malformed URL). URL is 
'" + query + "'");
    +            throw e;
    +        }
    +
    +        // Connect to the Ignite server, send query and get raw response
    +        String responseRaw = null;
    +        try {
    +            StringBuilder sb = new StringBuilder();
    +            BufferedReader reader = new BufferedReader(new 
InputStreamReader(url.openStream()));
    +            String responseLine;
    +            while ((responseLine = reader.readLine()) != null) {
    +                sb.append(responseLine);
    +            }
    +            reader.close();
    --- End diff --
    
    should this line go into finally block, in case reading fails and throws an 
Exception ?


---

Reply via email to