Github user leskin-in commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/1344#discussion_r170735616
  
    --- Diff: 
pxf/pxf-ignite/src/main/java/org/apache/hawq/pxf/plugins/ignite/IgniteAccessor.java
 ---
    @@ -0,0 +1,502 @@
    +package org.apache.hawq.pxf.plugins.ignite;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +import org.apache.hawq.pxf.api.OneRow;
    +import org.apache.hawq.pxf.api.ReadAccessor;
    +import org.apache.hawq.pxf.api.WriteAccessor;
    +import org.apache.hawq.pxf.api.UserDataException;
    +import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
    +import org.apache.hawq.pxf.api.utilities.InputData;
    +import org.apache.hawq.pxf.plugins.ignite.IgnitePlugin;
    +
    +import java.util.ArrayList;
    +import java.util.Iterator;
    +import java.util.LinkedList;
    +import java.util.regex.Matcher;
    +import java.util.regex.Pattern;
    +import java.io.InputStreamReader;
    +import java.io.BufferedReader;
    +import java.io.IOException;
    +import java.io.UnsupportedEncodingException;
    +import java.net.URL;
    +import java.net.URLEncoder;
    +import java.net.MalformedURLException;
    +import java.net.ProtocolException;
    +
    +import org.apache.commons.logging.Log;
    +import org.apache.commons.logging.LogFactory;
    +
    +import com.google.gson.JsonParser;
    +import com.google.gson.JsonElement;
    +import com.google.gson.JsonArray;
    +
    +
    +/**
    + * Ignite database read and write accessor
    + */
    +public class IgniteAccessor extends IgnitePlugin implements ReadAccessor, 
WriteAccessor {
    +    private static final Log LOG = LogFactory.getLog(IgniteAccessor.class);
    +
    +    // Prepared URLs to send to Ignite when reading data
    +    private String urlReadStart = null;
    +    private String urlReadFetch = null;
    +    private String urlReadClose = null;
    +    // Set to true when Ignite reported all the data for the SELECT query 
was retreived
    +    private boolean isLastReadFinished = false;
    +    // A buffer to store the SELECT query results (without Ignite metadata)
    +    private LinkedList<JsonArray> bufferRead = new LinkedList<JsonArray>();
    +
    +    // A template for the INSERT 
    +    private String queryWrite = null;
    +    // Set to true when the INSERT operation is in progress
    +    private boolean isWriteActive = false;
    +    // A buffer to store prepared values for the INSERT query
    +    private LinkedList<OneRow> bufferWrite = new LinkedList<OneRow>();
    +    
    +
    +    /**
    +     * Class constructor.
    +     */
    +    public IgniteAccessor(InputData inputData) throws UserDataException {
    +        super(inputData);
    +    }
    +
    +    /**
    +     * openForRead() implementation
    +     */
    +    @Override
    +    public boolean openForRead() throws Exception {
    +        if (bufferSize == 0) {
    +            bufferSize = 1;
    +        }
    +
    +        StringBuilder sb = new StringBuilder();
    +
    +        // Insert a list of fields to be selected
    +        ArrayList<ColumnDescriptor> columns = 
inputData.getTupleDescription();
    +        if (columns == null) {
    +            throw new UserDataException("Tuple description must be 
present.");
    +        }
    +        sb.append("SELECT ");
    +        for (int i = 0; i < columns.size(); i++) {
    +            ColumnDescriptor column = columns.get(i);
    +            if (i > 0) {
    +                sb.append(",");
    +            }
    +            sb.append(column.columnName());
    +        }
    +
    +        // Insert the name of the table to select values from
    +        sb.append(" FROM ");
    +        String tableName = inputData.getDataSource();
    +        if (tableName == null) {
    +            throw new UserDataException("Table name must be set as 
DataSource.");
    +        }
    +        sb.append(tableName);
    +
    +        // Insert query constraints
    +        // Note: Filter constants may be provided separately from the 
query itself, mostly for the safety of the SQL queries; at the moment, however, 
they are passed in the query itself.
    +        ArrayList<String> filterConstants = null;
    +        if (inputData.hasFilter()) {
    +            WhereSQLBuilder filterBuilder = new WhereSQLBuilder(inputData);
    +            String whereSql = filterBuilder.buildWhereSQL();
    +
    +            if (whereSql != null) {
    +                sb.append(" WHERE ").append(whereSql);
    +            }
    +        }
    +
    +        // Insert partition constaints
    +        sb = new 
IgnitePartitionFragmenter(inputData).buildFragmenterSql(sb);
    +
    +        // Format URL
    +        urlReadStart = buildQueryFldexe(sb.toString(), filterConstants);
    +
    +        // Send the first REST request that opens the connection
    +        JsonElement response = sendRestRequest(urlReadStart);
    --- End diff --
    
    This check is performed by `sendRestRequest()` function. If the request was 
not successfull, `sendRestRequest` will throw a 
`java.net.MalformedURLException`, `java.io.IOException` or 
`java.net.ProtocolException`. All these errors will include the URL that caused 
them and (in case of error reported by Ignite in it's response) additional 
data. As these exceptions should be passed "up", they are not handled here. See 
https://github.com/apache/incubator-hawq/pull/1344/files#diff-f5894bad29c44342eee2760e259a91d6R480
 for the details.
    
    When the exception is thrown by `readNextObject()`, the procedure 
`closeForRead()` is called, as it would be in case `false` was returned. So 
both variants (`return false` and `throw ...`) are safe. However, (AFAIK) if 
the exception is not thrown by `readNextObject()`, the user will not get the 
message that the read went wrong. So throwing exceptions from `public` 
`IgniteAccessor` functions is necessary.


---

Reply via email to