Github user hornn commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/1344#discussion_r171081924
  
    --- Diff: 
pxf/pxf-ignite/src/main/java/org/apache/hawq/pxf/plugins/ignite/IgniteAccessor.java
 ---
    @@ -0,0 +1,502 @@
    +package org.apache.hawq.pxf.plugins.ignite;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +import org.apache.hawq.pxf.api.OneRow;
    +import org.apache.hawq.pxf.api.ReadAccessor;
    +import org.apache.hawq.pxf.api.WriteAccessor;
    +import org.apache.hawq.pxf.api.UserDataException;
    +import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
    +import org.apache.hawq.pxf.api.utilities.InputData;
    +import org.apache.hawq.pxf.plugins.ignite.IgnitePlugin;
    +
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.LinkedList;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +import java.io.InputStreamReader;
    +import java.io.BufferedReader;
    +import java.io.IOException;
    +import java.io.UnsupportedEncodingException;
    +import java.net.URL;
    +import java.net.URLEncoder;
    +import java.net.MalformedURLException;
    +import java.net.ProtocolException;
    +
    +import org.apache.commons.logging.Log;
    +import org.apache.commons.logging.LogFactory;
    +
    +import com.google.gson.JsonParser;
    +import com.google.gson.JsonElement;
    +import com.google.gson.JsonArray;
    +
    +
    +/**
    + * Ignite database read and write accessor
    + */
    +public class IgniteAccessor extends IgnitePlugin implements ReadAccessor, 
WriteAccessor {
    +    private static final Log LOG = LogFactory.getLog(IgniteAccessor.class);
    +
    +    // Prepared URLs to send to Ignite when reading data
    +    private String urlReadStart = null;
    +    private String urlReadFetch = null;
    +    private String urlReadClose = null;
    +    // Set to true when Ignite reported all the data for the SELECT query 
was retreived
    +    private boolean isLastReadFinished = false;
    +    // A buffer to store the SELECT query results (without Ignite metadata)
    +    private LinkedList<JsonArray> bufferRead = new LinkedList<JsonArray>();
    +
    +    // A template for the INSERT 
    +    private String queryWrite = null;
    +    // Set to true when the INSERT operation is in progress
    +    private boolean isWriteActive = false;
    +    // A buffer to store prepared values for the INSERT query
    +    private LinkedList<OneRow> bufferWrite = new LinkedList<OneRow>();
    +    
    +
    +    /**
    +     * Class constructor.
    +     */
    +    public IgniteAccessor(InputData inputData) throws UserDataException {
    +        super(inputData);
    +    }
    +
    +    /**
    +     * openForRead() implementation
    +     */
    +    @Override
    +    public boolean openForRead() throws Exception {
    +        if (bufferSize == 0) {
    +            bufferSize = 1;
    +        }
    +
    +        StringBuilder sb = new StringBuilder();
    +
    +        // Insert a list of fields to be selected
    +        ArrayList<ColumnDescriptor> columns = 
inputData.getTupleDescription();
    +        if (columns == null) {
    +            throw new UserDataException("Tuple description must be 
present.");
    +        }
    +        sb.append("SELECT ");
    +        for (int i = 0; i < columns.size(); i++) {
    +            ColumnDescriptor column = columns.get(i);
    +            if (i > 0) {
    +                sb.append(",");
    +            }
    +            sb.append(column.columnName());
    +        }
    +
    +        // Insert the name of the table to select values from
    +        sb.append(" FROM ");
    +        String tableName = inputData.getDataSource();
    +        if (tableName == null) {
    +            throw new UserDataException("Table name must be set as 
DataSource.");
    +        }
    +        sb.append(tableName);
    +
    +        // Insert query constraints
    +        // Note: Filter constants may be provided separately from the 
query itself, mostly for the safety of the SQL queries; at the moment, however, 
they are passed in the query itself.
    +        ArrayList<String> filterConstants = null;
    +        if (inputData.hasFilter()) {
    +            WhereSQLBuilder filterBuilder = new WhereSQLBuilder(inputData);
    +            String whereSql = filterBuilder.buildWhereSQL();
    +
    +            if (whereSql != null) {
    +                sb.append(" WHERE ").append(whereSql);
    +            }
    +        }
    +
    +        // Insert partition constaints
    +        sb = new 
IgnitePartitionFragmenter(inputData).buildFragmenterSql(sb);
    +
    +        // Format URL
    +        urlReadStart = buildQueryFldexe(sb.toString(), filterConstants);
    +
    +        // Send the first REST request that opens the connection
    +        JsonElement response = sendRestRequest(urlReadStart);
    +
    +        // Build 'urlReadFetch' and 'urlReadClose'
    +        isLastReadFinished = 
response.getAsJsonObject().get("last").getAsBoolean();
    +        urlReadFetch = 
buildQueryFetch(response.getAsJsonObject().get("queryId").getAsInt());
    +        urlReadClose = 
buildQueryCls(response.getAsJsonObject().get("queryId").getAsInt());
    +
    +        LOG.info("Ignite read request. URL: '" + urlReadStart + "'");
    +        return true;
    +    }
    +
    +    /**
    +     * readNextObject() implementation
    +     */
    +    @Override
    +    public OneRow readNextObject() throws Exception {
    +        if (urlReadFetch == null) {
    +            LOG.error("readNextObject(): urlReadFetch is null. This means 
the Ignite qryfldexe query was not executed properly");
    +            throw new ProtocolException("readNextObject(): urlReadFetch is 
null. This means the Ignite qryfldexe query was not executed properly");
    +        }
    +
    +        if (bufferRead.isEmpty()) {
    +            // Refill buffer
    +            if (isLastReadFinished) {
    +                if (LOG.isDebugEnabled()) {
    +                    LOG.debug("readNextObject(): All the data received 
from Ignite");
    +                }
    +                return null;
    +            }
    +
    +            JsonElement response = sendRestRequest(urlReadFetch);
    +            isLastReadFinished = 
response.getAsJsonObject().get("last").getAsBoolean();
    +
    +            // Parse 'items'
    +            Iterator<JsonElement> itemsIterator = 
response.getAsJsonObject().get("items").getAsJsonArray().iterator();
    +            while (itemsIterator.hasNext()) {
    +                if 
(!bufferRead.add(itemsIterator.next().getAsJsonArray())) {
    +                    LOG.error("readNextObject(): Buffer refill failed (not 
enough memory in 'bufferRead')");
    +                    throw new OutOfMemoryError("readNextObject(): not 
enough memory in 'bufferRead'");
    +                }
    +            }
    +
    +            // Check again in case "response" contains no elements
    +            if (bufferRead.isEmpty()) {
    +                if (LOG.isDebugEnabled()) {
    +                    LOG.debug("readNextObject(): Buffer refill failed");
    +                    LOG.debug("readNextObject(): All the data received 
from Ignite");
    +                }
    +                return null;
    +            }
    +        }
    +
    +        return new OneRow(bufferRead.pollFirst());
    +    }
    +
    +    /**
    +     * closeForRead() implementation
    +     */
    +    @Override
    +    public void closeForRead() {
    +        if (urlReadClose != null) {
    +            try {
    +                sendRestRequest(urlReadClose);
    +            }
    +            catch (Exception e) {
    +                if (LOG.isDebugEnabled()) {
    +                    LOG.debug("closeForRead() Exception: " + 
e.getClass().getSimpleName());
    +                }
    +            }
    +        }
    +        isLastReadFinished = false;
    +
    +        if (LOG.isDebugEnabled()) {
    +            LOG.debug("Ignite read request finished. URL: '" + 
urlReadClose + "'");
    +        }
    +    }
    +
    +    /**
    +     * openForWrite() implementation.
    +     * No queries are sent to Ignite by this procedure, so if there are 
some problems (for example, with connection), they will be revealed only during 
the execution of 'writeNextObject()'
    +     */
    +    @Override
    +    public boolean openForWrite() throws UserDataException {
    +        // This is a temporary solution. At the moment there is no other 
way (except for the usage of user-defined parameters) to get the correct name 
of Ignite table: GPDB inserts extra data into the address, as required by 
Hadoop.
    +        // Note that if no extra data is present, the 'definedSource' will 
be left unchanged
    +        String definedSource = inputData.getDataSource();
    +        Pattern pattern = Pattern.compile("/(.*)/[0-9]*-[0-9]*_[0-9]*");
    +        Matcher matcher = pattern.matcher(definedSource);
    +        if (matcher.find()) {
    +            inputData.setDataSource(matcher.group(1));
    +        }
    +
    +        StringBuilder sb = new StringBuilder();
    +        sb.append("INSERT INTO ");
    +
    +        // Insert the table name
    +        String tableName = inputData.getDataSource();
    +        if (tableName == null) {
    +            throw new UserDataException("Table name must be set as 
DataSource.");
    +        }
    +        sb.append(tableName);
    +
    +        // Insert the column names
    +        sb.append("(");
    +        ArrayList<ColumnDescriptor> columns = 
inputData.getTupleDescription();
    +        if (columns == null) {
    +            throw new UserDataException("Tuple description must be 
present.");
    +        }
    +        String fieldDivisor = "";
    +        for (int i = 0; i < columns.size(); i++) {
    +            sb.append(fieldDivisor);
    +            fieldDivisor = ", ";
    +            sb.append(columns.get(i).columnName());
    +        }
    +        sb.append(")");
    +
    +        sb.append(" VALUES ");
    +
    +        queryWrite = sb.toString();
    +        return true;
    +    }
    +
    +    /**
    +     * writeNextObject() implementation
    +     */
    +    @Override
    +    public boolean writeNextObject(OneRow currentRow) throws Exception {
    +        boolean currentRowInBuffer = bufferWrite.add(currentRow);
    +
    +        if (!isWriteActive) {
    +            if (!currentRowInBuffer) {
    +                LOG.error("writeNextObject(): Failed (not enough memory in 
'bufferWrite')");
    +                throw new OutOfMemoryError("writeNextObject(): not enough 
memory in 'bufferWrite'");
    +            }
    +            LOG.info("Ignite write request. Query: '" + queryWrite + "'");
    +            sendInsertRestRequest(queryWrite);
    +            bufferWrite.removeFirst();
    +            isWriteActive = true;
    +            return true;
    +        }
    +
    +        if ((bufferWrite.size() >= bufferSize) || (!currentRowInBuffer)) {
    +            sendInsertRestRequest(queryWrite);
    +        }
    +
    +        if (!currentRowInBuffer) {
    +            if (!bufferWrite.add(currentRow)) {
    +                LOG.error("writeNextObject(): Failed (not enough memory in 
'bufferSend')");
    +                throw new OutOfMemoryError("writeNextObject(): not enough 
memory in 'bufferSend'");
    +            }
    +        }
    +        
    +        return true;
    +    }
    +
    +    /**
    +     * closeForWrite() implementation
    +     */
    +    @Override
    +    public void closeForWrite() throws Exception {
    +        if (!bufferWrite.isEmpty()) {
    +            sendInsertRestRequest(queryWrite);
    +        }
    +        if (isWriteActive) {
    +            // At this point, the request must have finished successfully; 
otherwise an exception would be thrown before
    +            LOG.info("Ignite write request finished successfully. Query: 
'" + queryWrite + "'");
    +        }
    +        isWriteActive = false;
    +    }
    +
    +    /**
    +     * Build HTTP GET query for Ignite REST API with command 'qryfldexe'
    +     * 
    +     * @param querySql SQL query with filter constants. The constants must 
replaced by "?" if 'filterConstants' is not given
    +     * @param filterConstants
    +     * 
    +     * @return Prepared HTTP query. The query will be properly encoded 
with {@link java.net.URLEncoder}
    +     * 
    +     * @throws UnsupportedEncodingException from {@link 
java.net.URLEncoder.encode()}
    +     */
    +    private String buildQueryFldexe(String querySql, ArrayList<String> 
filterConstants) throws UnsupportedEncodingException {
    +        StringBuilder sb = new StringBuilder();
    +        sb.append("http://";);
    +        sb.append(igniteHost);
    +        sb.append("/ignite");
    +        sb.append("?");
    +        sb.append("cmd=qryfldexe");
    +        sb.append("&");
    +        sb.append("pageSize=0");
    +        sb.append("&");
    +        if (cacheName != null) {
    +            sb.append("cacheName=");
    +            // Note that Ignite supports only "good" cache names (those 
that should be left unchanged by the URLEncoder.encode())
    +            sb.append(URLEncoder.encode(cacheName, "UTF-8"));
    +            sb.append("&");
    +        }
    +        int counter = 1;
    +        if (filterConstants != null) {
    --- End diff --
    
    could you please the filter construction? it looks like it creates 
something like
    `arg1=A&arg2=B&arg3=C`
    how do we know what field argX stands for? and if we want to filter by the 
last field, how can it be done? (I would think in that case we will have to 
pass the filterConstants with empty values as placeholders for the other 
fields?)
    If I understand correctly, this code is never executed because we always 
send filterConstants as null, so I just want to make sure it will work as 
expected.


---

Reply via email to