Github user hornn commented on a diff in the pull request: https://github.com/apache/incubator-hawq/pull/1344#discussion_r171130678 --- Diff: pxf/pxf-ignite/src/main/java/org/apache/hawq/pxf/plugins/ignite/IgnitePartitionFragmenter.java --- @@ -0,0 +1,384 @@ +package org.apache.hawq.pxf.plugins.ignite; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.hawq.pxf.api.Fragment; +import org.apache.hawq.pxf.api.Fragmenter; +import org.apache.hawq.pxf.api.FragmentsStats; +import org.apache.hawq.pxf.api.UserDataException; +import org.apache.hawq.pxf.api.utilities.InputData; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Calendar; +import java.util.Date; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.commons.lang.ArrayUtils; + +/** + * Fragmenter class for Ignite data resources. + * @note At the moment this fragmenter works just like the JDBC's one + * + * Aside from implementing PXF {@link Fragmenter}, + * this class also transforms input data path (given by user) by adding the + * information on splitting the request into a few requests for different regions. + * <br> + * The parameter Patterns + * <br> + * There are three parameters, the format is as follows: + * <br> + * <pre> + * <code>PARTITION_BY=column_name:column_type&RANGE=start_value[:end_value]&INTERVAL=interval_num[:interval_unit]</code> + * </pre> + * The <code>PARTITION_BY</code> parameter can be split by colon(':'),the <code>column_type</code> current supported : <code>date,int,enum</code> . + * The Date format is 'yyyy-MM-dd'. <br> + * The <code>RANGE</code> parameter can be split by colon(':') ,used to identify the starting range of each fragment. + * The range is left-closed, ie:<code> '>= start_value AND < end_value' </code>.If the <code>column_type</code> is <code>int</code>, + * the <code>end_value</code> can be empty. If the <code>column_type</code>is <code>enum</code>,the parameter <code>RANGE</code> can be empty. <br> + * The <code>INTERVAL</code> parameter can be split by colon(':'), indicate the interval value of one fragment. + * When <code>column_type</code> is <code>date</code>,this parameter must be split by colon, and <code>interval_unit</code> can be <code>year,month,day</code>. + * When <code>column_type</code> is <code>int</code>, the <code>interval_unit</code> can be empty. + * When <code>column_type</code> is <code>enum</code>,the <code>INTERVAL</code> parameter can be empty. + * <br> + * <p> + * The syntax examples is: + * <br> + * <code>PARTITION_BY=createdate:date&RANGE=2008-01-01:2010-01-01&INTERVAL=1:month'</code> + * <br> + * <code>PARTITION_BY=year:int&RANGE=2008:2010&INTERVAL=1</code> + * <br> + * <code>PARTITION_BY=grade:enum&RANGE=excellent:good:general:bad</code> + * </p> + */ +public class IgnitePartitionFragmenter extends Fragmenter { + Log LOG = LogFactory.getLog(IgnitePartitionFragmenter.class); + + String[] partitionBy = null; + String[] range = null; + String[] interval = null; + PartitionType partitionType = null; + String partitionColumn = null; + IntervalType intervalType = null; + int intervalNum = 1; + + //when partitionType is DATE,it is valid + Calendar rangeStart = null; + Calendar rangeEnd = null; + + + enum PartitionType { + DATE, + INT, + ENUM; + + public static PartitionType getType(String str) { + return valueOf(str.toUpperCase()); + } + } + + enum IntervalType { + DAY, + MONTH, + YEAR; + + public static IntervalType type(String str) { + return valueOf(str.toUpperCase()); + } + } + + /** + * @throws UserDataException if the request parameter is malformed + */ + public IgnitePartitionFragmenter(InputData inConf) throws UserDataException { + super(inConf); + if (LOG.isDebugEnabled()) { + LOG.debug("Constructor started"); + } + + if (inConf.getUserProperty("PARTITION_BY") == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Constructor successful; partition was not used"); + } + return; + } + + try { + partitionBy = inConf.getUserProperty("PARTITION_BY").split(":"); + partitionColumn = partitionBy[0]; + partitionType = PartitionType.getType(partitionBy[1]); + } + catch (IllegalArgumentException | ArrayIndexOutOfBoundsException e1) { + throw new UserDataException("The parameter 'PARTITION_BY' invalid, the pattern is 'column_name:date|int|enum'"); + } + + //parse and validate parameter-RANGE + try { + String rangeStr = inConf.getUserProperty("RANGE"); + if (rangeStr != null) { + range = rangeStr.split(":"); + if (range.length == 1 && partitionType != PartitionType.ENUM) { + throw new UserDataException("The parameter 'RANGE' does not specify '[:end_value]'"); + } + } + else { + throw new UserDataException("The parameter 'RANGE' must be specified along with 'PARTITION_BY'"); + } + } + catch (IllegalArgumentException e) { + throw new UserDataException("The parameter 'RANGE' invalid, the pattern is 'start_value[:end_value]'"); + } + + //parse and validate parameter-INTERVAL + try { + String intervalStr = inConf.getUserProperty("INTERVAL"); + if (intervalStr != null) { + interval = intervalStr.split(":"); + intervalNum = Integer.parseInt(interval[0]); + if (interval.length > 1) { + intervalType = IntervalType.type(interval[1]); + } + if (interval.length == 1 && partitionType == PartitionType.DATE) { + throw new UserDataException("The parameter 'INTERVAL' does not specify unit [:year|month|day]"); + } + } + else if (partitionType != PartitionType.ENUM) { + throw new UserDataException("The parameter 'INTERVAL' must be specified along with 'PARTITION_BY'"); + } + if (intervalNum < 1) { + throw new UserDataException("The parameter 'INTERVAL' must > 1, but actual is '" + intervalNum + "'"); + } + } + catch (IllegalArgumentException e) { + throw new UserDataException("The parameter 'INTERVAL' invalid, the pattern is 'interval_num[:interval_unit]'"); + } + + //parse any date values + try { + if (partitionType == PartitionType.DATE) { + SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd"); + rangeStart = Calendar.getInstance(); + rangeStart.setTime(df.parse(range[0])); + rangeEnd = Calendar.getInstance(); + rangeEnd.setTime(df.parse(range[1])); + } + } catch (ParseException e) { + throw new UserDataException("The parameter 'RANGE' has invalid date format. Expected format is 'YYYY-MM-DD'"); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Constructor successful; some partition used"); + } + } + + /** + * Returns statistics for Ignite table. Currently it's not implemented. + * @throws UnsupportedOperationException + */ + @Override + public FragmentsStats getFragmentsStats() throws UnsupportedOperationException { + throw new UnsupportedOperationException("ANALYZE for Ignite plugin is not supported"); + } + + /** + * Returns list of fragments for Ignite table queries + * + * @return a list of fragments + * @throws UnknownHostException from java.net.InetAddress.getLocalHost().getHostAddress() + */ + @Override + public List<Fragment> getFragments() throws UnknownHostException { + if (LOG.isDebugEnabled()) { + LOG.debug("getFragments() called; dataSource is '" + inputData.getDataSource() + "'"); + } + + // Always use 'localhost' as a replica holder + // The replica holder is a *PXF* host which can process the requested query + // Note that this is not an Ignite host + String[] replicaHostAddressWrapped = new String[]{InetAddress.getLocalHost().getHostAddress()}; + byte[] fragmentMetadata = null; + byte[] fragmentUserdata = null; + + if (partitionType == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("getFragments() found no partition"); + } + Fragment fragment = new Fragment(inputData.getDataSource(), replicaHostAddressWrapped, fragmentMetadata, fragmentUserdata); + fragments.add(fragment); + if (LOG.isDebugEnabled()) { + LOG.debug("getFragments() successful. Total fragments: " + fragments.size()); + } + return fragments; + } + + switch (partitionType) { + case DATE: { + if (LOG.isDebugEnabled()) { + LOG.debug("getFragments() found DATE partition"); + } + int currInterval = intervalNum; + + Calendar fragStart = rangeStart; + while (fragStart.before(rangeEnd)) { + Calendar fragEnd = (Calendar) fragStart.clone(); + switch (intervalType) { + case DAY: + fragEnd.add(Calendar.DAY_OF_MONTH, currInterval); + break; + case MONTH: + fragEnd.add(Calendar.MONTH, currInterval); + break; + case YEAR: + fragEnd.add(Calendar.YEAR, currInterval); + break; + } + if (fragEnd.after(rangeEnd)) + fragEnd = (Calendar) rangeEnd.clone(); + + // Build Fragment.metadata: convert the date to a millisecond, then get bytes. + byte[] msStart = ByteUtil.getBytes(fragStart.getTimeInMillis()); + byte[] msEnd = ByteUtil.getBytes(fragEnd.getTimeInMillis()); + fragmentMetadata = ArrayUtils.addAll(msStart, msEnd); + + Fragment fragment = new Fragment(inputData.getDataSource(), replicaHostAddressWrapped, fragmentMetadata, fragmentUserdata); + fragments.add(fragment); + + // Continue the previous fragment + fragStart = fragEnd; + } + break; + } + case INT: { + if (LOG.isDebugEnabled()) { + LOG.debug("getFragments() found INT partition"); + } + int rangeStart = Integer.parseInt(range[0]); + int rangeEnd = Integer.parseInt(range[1]); + int currInterval = intervalNum; + + // Validate : curr_interval > 0 + int fragStart = rangeStart; + while (fragStart < rangeEnd) { + int fragEnd = fragStart + currInterval; + if (fragEnd > rangeEnd) fragEnd = rangeEnd; + + byte[] bStart = ByteUtil.getBytes(fragStart); + byte[] bEnd = ByteUtil.getBytes(fragEnd); + fragmentMetadata = ArrayUtils.addAll(bStart, bEnd); + + Fragment fragment = new Fragment(inputData.getDataSource(), replicaHostAddressWrapped, fragmentMetadata, fragmentUserdata); + fragments.add(fragment); + + // Continue the previous fragment + fragStart = fragEnd;// + 1; --- End diff -- remove comment?
---