Repository: incubator-hawq Updated Branches: refs/heads/HAWQ-703 45c63b793 -> 8c5e6f8b2 (forced update)
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8c5e6f8b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/FragmenterResource.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/FragmenterResource.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/FragmenterResource.java new file mode 100644 index 0000000..d6e8d49 --- /dev/null +++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/FragmenterResource.java @@ -0,0 +1,154 @@ +package org.apache.hawq.pxf.service.rest; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.hawq.pxf.api.Fragment; +import org.apache.hawq.pxf.api.Fragmenter; +import org.apache.hawq.pxf.api.FragmentsStats; +import org.apache.hawq.pxf.service.FragmenterFactory; +import org.apache.hawq.pxf.service.FragmentsResponse; +import org.apache.hawq.pxf.service.FragmentsResponseFormatter; +import org.apache.hawq.pxf.service.utilities.AnalyzeUtils; +import org.apache.hawq.pxf.service.utilities.ProtocolData; +import org.apache.hawq.pxf.service.utilities.SecuredHDFS; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import javax.servlet.ServletContext; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; + +import java.util.List; +import java.util.Map; + +/** + * Class enhances the API of the WEBHDFS REST server. Returns the data fragments + * that a data resource is made of, enabling parallel processing of the data + * resource. Example for querying API FRAGMENTER from a web client + * {@code curl -i "http://localhost:51200/pxf/{version}/Fragmenter/getFragments?path=/dir1/dir2/*txt"} + * <code>/pxf/</code> is made part of the path when there is a webapp by that + * name in tomcat. + */ +@Path("/" + Version.PXF_PROTOCOL_VERSION + "/Fragmenter/") +public class FragmenterResource extends RestResource { + private static final Log LOG = LogFactory.getLog(FragmenterResource.class); + + /** + * The function is called when + * {@code http://nn:port/pxf/{version}/Fragmenter/getFragments?path=...} is used. + * + * @param servletContext Servlet context contains attributes required by + * SecuredHDFS + * @param headers Holds HTTP headers from request + * @param path Holds URI path option used in this request + * @return response object with JSON serialized fragments metadata + * @throws Exception if getting fragments info failed + */ + @GET + @Path("getFragments") + @Produces("application/json") + public Response getFragments(@Context final ServletContext servletContext, + @Context final HttpHeaders headers, + @QueryParam("path") final String path) + throws Exception { + + ProtocolData protData = getProtocolData(servletContext, headers, path); + + /* Create a fragmenter instance with API level parameters */ + final Fragmenter fragmenter = FragmenterFactory.create(protData); + + List<Fragment> fragments = fragmenter.getFragments(); + + fragments = AnalyzeUtils.getSampleFragments(fragments, protData); + + FragmentsResponse fragmentsResponse = FragmentsResponseFormatter.formatResponse( + fragments, path); + + return Response.ok(fragmentsResponse, MediaType.APPLICATION_JSON_TYPE).build(); + } + + /** + * The function is called when + * {@code http://nn:port/pxf/{version}/Fragmenter/getFragmentsStats?path=...} is + * used. + * + * @param servletContext Servlet context contains attributes required by + * SecuredHDFS + * @param headers Holds HTTP headers from request + * @param path Holds URI path option used in this request + * @return response object with JSON serialized fragments statistics + * @throws Exception if getting fragments info failed + */ + @GET + @Path("getFragmentsStats") + @Produces("application/json") + public Response getFragmentsStats(@Context final ServletContext servletContext, + @Context final HttpHeaders headers, + @QueryParam("path") final String path) + throws Exception { + + ProtocolData protData = getProtocolData(servletContext, headers, path); + + /* Create a fragmenter instance with API level parameters */ + final Fragmenter fragmenter = FragmenterFactory.create(protData); + + FragmentsStats fragmentsStats = fragmenter.getFragmentsStats(); + String response = FragmentsStats.dataToJSON(fragmentsStats); + if (LOG.isDebugEnabled()) { + LOG.debug(FragmentsStats.dataToString(fragmentsStats, path)); + } + + return Response.ok(response, MediaType.APPLICATION_JSON_TYPE).build(); + } + + private ProtocolData getProtocolData(final ServletContext servletContext, + final HttpHeaders headers, + final String path) throws Exception { + + if (LOG.isDebugEnabled()) { + StringBuilder startMsg = new StringBuilder( + "FRAGMENTER started for path \"" + path + "\""); + for (String header : headers.getRequestHeaders().keySet()) { + startMsg.append(" Header: ").append(header).append(" Value: ").append( + headers.getRequestHeader(header)); + } + LOG.debug(startMsg); + } + + /* Convert headers into a case-insensitive regular map */ + Map<String, String> params = convertToCaseInsensitiveMap(headers.getRequestHeaders()); + + /* Store protocol level properties and verify */ + ProtocolData protData = new ProtocolData(params); + if (protData.getFragmenter() == null) { + protData.protocolViolation("fragmenter"); + } + SecuredHDFS.verifyToken(protData, servletContext); + + return protData; + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8c5e6f8b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/InvalidPathResource.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/InvalidPathResource.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/InvalidPathResource.java new file mode 100644 index 0000000..5a9f0d1 --- /dev/null +++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/InvalidPathResource.java @@ -0,0 +1,179 @@ +package org.apache.hawq.pxf.service.rest; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hawq.pxf.api.utilities.Utilities; + +import com.google.common.collect.ImmutableSet; + +import java.util.Arrays; +import java.util.List; + +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.PathSegment; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.ResponseBuilder; +import javax.ws.rs.core.UriInfo; + +/** + * Class for catching paths that are not defined by other resources. + * NOTE: This resource must be accessible without any security checks + * as it is used to verify proper load of the PXF webapp. + * + * For each path, the version is compared to the current version PXF_VERSION. + * The expected format of a path is "{@code http://<host>:<port>/pxf/<version>/<rest of path>}" + * + * The returned value is always a Server Error code (500). + * If the version is different than the current version, an appropriate error is returned with version details. + * Otherwise, an error about unknown path is returned. + */ +@Path("/") +public class InvalidPathResource { + @Context + UriInfo rootUri; + + private static final Log LOG = LogFactory.getLog(InvalidPathResource.class); + // Set of retired endpoints + private final ImmutableSet<String> retiredEndPoints = ImmutableSet.of( + "Analyzer"); + + public InvalidPathResource() { + } + + /** + * Catches path /pxf/ + * + * @return error message response + */ + @GET + @Path("/") + public Response noPathGet() { + return noPath(); + } + + /** + * Catches path /pxf/ + * + * @return error message response + */ + @POST + @Path("/") + public Response noPathPost() { + return noPath(); + } + + private Response noPath() { + return sendErrorMessage(getUnknownPathMsg()); + } + + /** + * Catches paths of pattern /pxf/* + * + * @param path request path + * @return error message response + */ + @GET + @Path("/{path:.*}") + public Response wrongPathGet(@PathParam("path") String path) { + return wrongPath(path); + } + + /** + * Catches paths of pattern /pxf/* + * + * @param path request path + * @return error message response + */ + @POST + @Path("/{path:.*}") + public Response wrongPathPost(@PathParam("path") String path) { + return wrongPath(path); + } + + private Response wrongPath(String path) { + + String errmsg; + + List<PathSegment> pathSegments = rootUri.getPathSegments(); + + if(pathSegments.isEmpty()) { + return sendErrorMessage(getUnknownPathMsg()); + } + + String version = pathSegments.get(0).getPath(); + String endPoint = (pathSegments.size() > 1) ? pathSegments.get(1).getPath() : null; + + LOG.debug("REST request: " + rootUri.getAbsolutePath() + ". " + + "Version " + version + ", supported version is " + Version.PXF_PROTOCOL_VERSION); + + if(version.equals(Version.PXF_PROTOCOL_VERSION)) { // api with correct version but incorrect path + if (retiredEndPoints.contains(endPoint)) { // api with retired endpoint + errmsg = getRetiredPathMsg(endPoint); + } else { + errmsg = getUnknownPathMsg(); + } + } else if(!(version.matches("v[0-9]+"))) { // api with version not of the format "v<number>" + errmsg = getUnknownPathMsg(); + } else { // api with wrong version number + errmsg = "Wrong version " + version + ", supported version is " + Version.PXF_PROTOCOL_VERSION; + } + + return sendErrorMessage(errmsg); + } + + /** + * Returns error message + */ + private Response sendErrorMessage(String message) { + ResponseBuilder b = Response.serverError(); + b.entity(message); + b.type(MediaType.TEXT_PLAIN_TYPE); + return b.build(); + } + + /** + * Returns unknown path message, with the path's special characters masked. + */ + private String getUnknownPathMsg() { + return "Unknown path \"" + Utilities.maskNonPrintables(rootUri.getAbsolutePath().toString()) + "\""; + } + + /** + * Warn on recently retired paths + * eg: http://<host>:<port>/pxf/<version>/Analyzer/<rest of path> + * + * Returns message about path not being supported + */ + private String getRetiredPathMsg(String endpoint) { + if("Analyzer".equals(endpoint)) { + return endpoint + " API is retired. Please use /Fragmenter/getFragmentsStats instead"; + } else { + return endpoint + " API is retired"; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8c5e6f8b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/MetadataResource.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/MetadataResource.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/MetadataResource.java new file mode 100644 index 0000000..3f85bb8 --- /dev/null +++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/MetadataResource.java @@ -0,0 +1,124 @@ +package org.apache.hawq.pxf.service.rest; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import javax.servlet.ServletContext; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; + +import org.apache.catalina.connector.ClientAbortException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hawq.pxf.api.Metadata; +import org.apache.hawq.pxf.api.MetadataFetcher; +import org.apache.hawq.pxf.api.utilities.InputData; +import org.apache.hawq.pxf.service.MetadataFetcherFactory; +import org.apache.hawq.pxf.service.MetadataResponse; +import org.apache.hawq.pxf.service.MetadataResponseFormatter; +import org.apache.hawq.pxf.service.utilities.ProtocolData; +import org.apache.hawq.pxf.service.utilities.SecuredHDFS; + +/** + * Class enhances the API of the WEBHDFS REST server. Returns the metadata of a + * given hcatalog table. <br> + * Example for querying API FRAGMENTER from a web client:<br> + * <code>curl -i "http://localhost:51200/pxf/{version}/Metadata/getTableMetadata?table=t1"</code> + * <br> + * /pxf/ is made part of the path when there is a webapp by that name in tomcat. + */ +@Path("/" + Version.PXF_PROTOCOL_VERSION + "/Metadata/") +public class MetadataResource extends RestResource { + private static final Log LOG = LogFactory.getLog(MetadataResource.class); + + public MetadataResource() throws IOException { + } + + /** + * This function queries the underlying store based on the given profile to get schema for items that match the given pattern + * metadata: Item name, field names, field types. The types are converted + * from the underlying types to HAWQ types. + * Unsupported types result in an error. <br> + * Response Examples:<br> + * For a table <code>default.t1</code> with 2 fields (a int, b float) will + * be returned as: + * <code>{"PXFMetadata":[{"item":{"path":"default","name":"t1"},"fields":[{"name":"a","type":"int"},{"name":"b","type":"float"}]}]}</code> + * + * @param servletContext servlet context + * @param headers http headers + * @param profile based on this the metadata source can be inferred + * @param pattern table/file name or pattern in the given source + * @return JSON formatted response with metadata of each item that corresponds to the pattern + * @throws Exception if connection to the source/catalog failed, item didn't exist for the pattern + * its type or fields are not supported + */ + @GET + @Path("getMetadata") + @Produces("application/json") + public Response read(@Context final ServletContext servletContext, + @Context final HttpHeaders headers, + @QueryParam("profile") final String profile, + @QueryParam("pattern") final String pattern) + throws Exception { + LOG.debug("getMetadata started"); + String jsonOutput; + try { + + // Convert headers into a regular map + Map<String, String> params = convertToCaseInsensitiveMap(headers.getRequestHeaders()); + + // Add profile and verify token + ProtocolData protData = new ProtocolData(params, profile.toLowerCase()); + + // 0. Verify token + SecuredHDFS.verifyToken(protData, servletContext); + + // 1. start MetadataFetcher + MetadataFetcher metadataFetcher = MetadataFetcherFactory.create(protData); + + // 2. get Metadata + List<Metadata> metadata = metadataFetcher.getMetadata(pattern); + + // 3. stream JSON ouptput + MetadataResponse metadataResponse = MetadataResponseFormatter.formatResponse( + metadata, pattern); + + return Response.ok(metadataResponse, MediaType.APPLICATION_JSON_TYPE).build(); + + } catch (ClientAbortException e) { + LOG.error("Remote connection closed by HAWQ", e); + throw e; + } catch (java.io.IOException e) { + LOG.error("Unhandled exception thrown", e); + throw e; + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8c5e6f8b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/RestResource.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/RestResource.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/RestResource.java new file mode 100644 index 0000000..60bb31e --- /dev/null +++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/RestResource.java @@ -0,0 +1,71 @@ +package org.apache.hawq.pxf.service.rest; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import javax.ws.rs.core.MultivaluedMap; + +import org.apache.commons.codec.CharEncoding; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.io.UnsupportedEncodingException; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +/** + * Super of all PXF REST classes + */ +public abstract class RestResource { + + private static final Log LOG = LogFactory.getLog(RestResource.class); + + /** + * Converts the request headers multivalued map to a case-insensitive + * regular map by taking only first values and storing them in a + * CASE_INSENSITIVE_ORDER TreeMap. All values are converted from ISO_8859_1 + * (ISO-LATIN-1) to UTF_8. + * + * @param requestHeaders request headers multi map. + * @return a regular case-insensitive map. + * @throws UnsupportedEncodingException if the named charsets ISO_8859_1 and + * UTF_8 are not supported + */ + public Map<String, String> convertToCaseInsensitiveMap(MultivaluedMap<String, String> requestHeaders) + throws UnsupportedEncodingException { + Map<String, String> result = new TreeMap<>( + String.CASE_INSENSITIVE_ORDER); + for (Map.Entry<String, List<String>> entry : requestHeaders.entrySet()) { + String key = entry.getKey(); + List<String> values = entry.getValue(); + if (values != null) { + String value = values.get(0); + if (value != null) { + // converting to value UTF-8 encoding + value = new String(value.getBytes(CharEncoding.ISO_8859_1), + CharEncoding.UTF_8); + LOG.trace("key: " + key + ". value: " + value); + result.put(key, value.replace("\\\"", "\"")); + } + } + } + return result; + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8c5e6f8b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/ServletLifecycleListener.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/ServletLifecycleListener.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/ServletLifecycleListener.java new file mode 100644 index 0000000..f7b897a --- /dev/null +++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/ServletLifecycleListener.java @@ -0,0 +1,63 @@ +package org.apache.hawq.pxf.service.rest; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import javax.servlet.ServletContextListener; +import javax.servlet.ServletContextEvent; + +import org.apache.hawq.pxf.service.utilities.Log4jConfigure; +import org.apache.hawq.pxf.service.utilities.SecureLogin; + +/** + * Listener on lifecycle events of our webapp + */ +public class ServletLifecycleListener implements ServletContextListener { + + private static final Log LOG = LogFactory.getLog(ServletContextListener.class); + + /** + * Called after the webapp has been initialized. + * + * 1. Initializes log4j. + * 2. Initiates a Kerberos login when Hadoop security is on. + */ + @Override + public void contextInitialized(ServletContextEvent event) { + // 1. Initialize log4j: + Log4jConfigure.configure(event); + + LOG.info("webapp initialized"); + + // 2. Initiate secure login + SecureLogin.login(); + } + + /** + * Called before the webapp is about to go down + */ + @Override + public void contextDestroyed(ServletContextEvent event) { + LOG.info("webapp about to go down"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8c5e6f8b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/VersionResource.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/VersionResource.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/VersionResource.java new file mode 100644 index 0000000..bd47e8a --- /dev/null +++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/VersionResource.java @@ -0,0 +1,88 @@ +package org.apache.hawq.pxf.service.rest; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.ResponseBuilder; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * PXF protocol version. Any call to PXF resources should include the current + * version e.g. {@code ...pxf/v14/Bridge} + */ +class Version { + /** + * Constant which holds current protocol version. Getting replaced with + * actual value on build stage, using pxfProtocolVersion parameter from + * gradle.properties + */ + final static String PXF_PROTOCOL_VERSION = "v14"; + + public Version() { + } + + public String version; + + public String getVersion() { + return version; + } + + public void setVersion(String version) { + this.version = version; + } +} + +/** + * Class returning the protocol version used by PXF. + * + * The expected format of a path is " + * {@code http://<host>:<port>/pxf/ProtocolVersion}" The expected response is " + * {@code PXF protocol version <version>}" + * + */ +@Path("/ProtocolVersion") +public class VersionResource { + + private static final Log LOG = LogFactory.getLog(VersionResource.class); + + public VersionResource() { + } + + /** + * Returns the PXF protocol version used currently. + * + * @return response with the PXF protocol version + */ + @GET + @Produces("application/json") + public Response getProtocolVersion() { + + ResponseBuilder b = Response.ok(); + b.entity("{ \"version\": \"" + Version.PXF_PROTOCOL_VERSION + "\"}"); + b.type(MediaType.APPLICATION_JSON_TYPE); + return b.build(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8c5e6f8b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/WritableResource.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/WritableResource.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/WritableResource.java new file mode 100644 index 0000000..a6c8d6b --- /dev/null +++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/rest/WritableResource.java @@ -0,0 +1,174 @@ +package org.apache.hawq.pxf.service.rest; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import java.io.DataInputStream; +import java.io.InputStream; +import java.util.Map; + +import javax.servlet.ServletContext; +import javax.ws.rs.Consumes; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; + +import org.apache.catalina.connector.ClientAbortException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hawq.pxf.api.utilities.Utilities; +import org.apache.hawq.pxf.service.Bridge; +import org.apache.hawq.pxf.service.WriteBridge; +import org.apache.hawq.pxf.service.utilities.ProtocolData; +import org.apache.hawq.pxf.service.utilities.SecuredHDFS; + +/* + * Running this resource manually: + * + * run: + curl -i -X post "http://localhost:51200/pxf/{version}/Writable/stream?path=/data/curl/curl`date \"+%h%d_%H%M%s\"`" \ + --header "X-GP-Accessor: TextFileWAccessor" \ + --header "X-GP-Resolver: TextWResolver" \ + --header "Content-Type:application/octet-stream" \ + --header "Expect: 100-continue" \ + --header "X-GP-ALIGNMENT: 4" \ + --header "X-GP-SEGMENT-ID: 0" \ + --header "X-GP-SEGMENT-COUNT: 3" \ + --header "X-GP-HAS-FILTER: 0" \ + --header "X-GP-FORMAT: TEXT" \ + --header "X-GP-URI: pxf://localhost:51200/data/curl/?Accessor=TextFileWAccessor&Resolver=TextWResolver" \ + --header "X-GP-URL-HOST: localhost" \ + --header "X-GP-URL-PORT: 51200" \ + --header "X-GP-ATTRS: 0" \ + --header "X-GP-DATA-DIR: data/curl/" \ + -d "data111" -d "data222" + + * result: + + HTTP/1.1 200 OK + Content-Type: text/plain;charset=UTF-8 + Content-Type: text/plain + Transfer-Encoding: chunked + Server: Jetty(7.6.10.v20130312) + + wrote 15 bytes to curlAug11_17271376231245 + + file content: + bin/hdfs dfs -cat /data/curl/*45 + data111&data222 + + */ + + +/** + * This class handles the subpath /<version>/Writable/ of this + * REST component + */ +@Path("/" + Version.PXF_PROTOCOL_VERSION + "/Writable/") +public class WritableResource extends RestResource{ + private static final Log LOG = LogFactory.getLog(WritableResource.class); + + public WritableResource() { + } + + /** + * This function is called when http://nn:port/pxf/{version}/Writable/stream?path=... + * is used. + * + * @param servletContext Servlet context contains attributes required by SecuredHDFS + * @param headers Holds HTTP headers from request + * @param path Holds URI path option used in this request + * @param inputStream stream of bytes to write from Hawq + * @return ok response if the operation finished successfully + * @throws Exception in case of wrong request parameters, failure to + * initialize bridge or to write data + */ + @POST + @Path("stream") + @Consumes(MediaType.APPLICATION_OCTET_STREAM) + public Response stream(@Context final ServletContext servletContext, + @Context HttpHeaders headers, + @QueryParam("path") String path, + InputStream inputStream) throws Exception { + + /* Convert headers into a case-insensitive regular map */ + Map<String, String> params = convertToCaseInsensitiveMap(headers.getRequestHeaders()); + if (LOG.isDebugEnabled()) { + LOG.debug("WritableResource started with parameters: " + params + " and write path: " + path); + } + + ProtocolData protData = new ProtocolData(params); + protData.setDataSource(path); + + SecuredHDFS.verifyToken(protData, servletContext); + Bridge bridge = new WriteBridge(protData); + + // THREAD-SAFE parameter has precedence + boolean isThreadSafe = protData.isThreadSafe() && bridge.isThreadSafe(); + LOG.debug("Request for " + path + " handled " + + (isThreadSafe ? "without" : "with") + " synchronization"); + + return isThreadSafe ? + writeResponse(bridge, path, inputStream) : + synchronizedWriteResponse(bridge, path, inputStream); + } + + private static synchronized Response synchronizedWriteResponse(Bridge bridge, + String path, + InputStream inputStream) + throws Exception { + return writeResponse(bridge, path, inputStream); + } + + private static Response writeResponse(Bridge bridge, + String path, + InputStream inputStream) throws Exception { + + String returnMsg; + + // Open the output file + bridge.beginIteration(); + + long totalWritten = 0; + + // dataStream will close automatically in the end of the try. + // inputStream is closed by dataStream.close(). + try (DataInputStream dataStream = new DataInputStream(inputStream)) { + while (bridge.setNext(dataStream)) { + ++totalWritten; + } + } catch (ClientAbortException e) { + LOG.debug("Remote connection closed by HAWQ", e); + } catch (Exception ex) { + LOG.debug("totalWritten so far " + totalWritten + " to " + path); + throw ex; + } + + String censuredPath = Utilities.maskNonPrintables(path); + returnMsg = "wrote " + totalWritten + " bulks to " + censuredPath; + LOG.debug(returnMsg); + + return Response.ok(returnMsg).build(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8c5e6f8b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/utilities/AnalyzeUtils.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/utilities/AnalyzeUtils.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/utilities/AnalyzeUtils.java new file mode 100644 index 0000000..21172c5 --- /dev/null +++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/utilities/AnalyzeUtils.java @@ -0,0 +1,147 @@ +package org.apache.hawq.pxf.service.utilities; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.BitSet; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hawq.pxf.api.Fragment; + +/** + * Helper class to get statistics for ANALYZE. + */ +public class AnalyzeUtils { + + private static final Log LOG = LogFactory.getLog(AnalyzeUtils.class); + + /** + * In case pxf_max_fragments parameter is declared, make sure not to get + * over the limit. The returned fragments are evenly distributed, in order + * to achieve good sampling. + * + * @param fragments fragments list + * @param protData container for parameters, including sampling data. + * @return a list of fragments no bigger than pxf_max_fragments parameter. + */ + static public List<Fragment> getSampleFragments(List<Fragment> fragments, + ProtocolData protData) { + + int listSize = fragments.size(); + int maxSize = protData.getStatsMaxFragments(); + List<Fragment> samplingList = new ArrayList<Fragment>(); + BitSet bitSet; + + if (maxSize == 0) { + return fragments; + } + + LOG.debug("fragments list has " + listSize + + " fragments, maxFragments = " + maxSize); + + bitSet = generateSamplingBitSet(listSize, maxSize); + + for (int i = 0; i < listSize; ++i) { + if (bitSet.get(i)) { + samplingList.add(fragments.get(i)); + } + } + + return samplingList; + } + + /** + * Marks sampleSize bits out of the poolSize, in a uniform way. + * + * @param poolSize pool size + * @param sampleSize sample size + * @return bit set with sampleSize bits set out of poolSize. + */ + static public BitSet generateSamplingBitSet(int poolSize, int sampleSize) { + + int skip = 0, chosen = 0, curIndex = 0; + BitSet bitSet = new BitSet(); + + if (poolSize <= 0 || sampleSize <= 0) { + return bitSet; + } + + if (sampleSize >= poolSize) { + LOG.debug("sampling bit map has " + poolSize + " elements (100%)"); + bitSet.set(0, poolSize); + return bitSet; + } + + skip = (poolSize / sampleSize) + 1; + + while (chosen < sampleSize) { + + bitSet.set(curIndex); + chosen++; + if (chosen == sampleSize) { + break; + } + + for (int i = 0; i < skip; ++i) { + curIndex = nextClearBitModulo((++curIndex) % poolSize, + poolSize, bitSet); + if (curIndex == -1) { + // should never happen + throw new IllegalArgumentException( + "Trying to sample more than pool size " + + "(pool size " + poolSize + + ", sampling size " + sampleSize); + } + } + } + + LOG.debug("sampling bit map has " + chosen + " elements:" + + bitSet.toString()); + + return bitSet; + } + + /** + * Returns index of next clear (false) bit, starting from and including + * index. If all bits from index to the end are set (true), search from the + * beginning. Return -1 if all bits are set (true). + * + * @param index starting point + * @param poolSize the bit set size + * @param bitSet bitset to search + * @return index of next clear bit, starting in index + */ + static private int nextClearBitModulo(int index, int poolSize, BitSet bitSet) { + + int indexToSet = bitSet.nextClearBit(index); + if (indexToSet == poolSize && index != 0) { + indexToSet = bitSet.nextClearBit(0); + } + /* means that all bits are already set, so we return -1 */ + if (indexToSet == poolSize) { + return -1; + } + + return indexToSet; + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8c5e6f8b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/utilities/CustomWebappLoader.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/utilities/CustomWebappLoader.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/utilities/CustomWebappLoader.java new file mode 100644 index 0000000..821f2d5 --- /dev/null +++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/utilities/CustomWebappLoader.java @@ -0,0 +1,231 @@ +package org.apache.hawq.pxf.service.utilities; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; + +import org.apache.catalina.LifecycleException; +import org.apache.catalina.loader.WebappLoader; +import org.apache.juli.logging.Log; +import org.apache.juli.logging.LogFactory; + +/** + * A WebappLoader that allows a customized classpath to be added through configuration in context xml. + * Any additional classpath entry will be added to the default webapp classpath. + * + * <pre> + * <Context> + * <Loader className="org.apache.hawq.pxf.service.utilities.CustomWebappLoader" + * classpathFiles="/somedir/classpathFile1;/somedir/classpathFile2"/> + * </Context> + * </pre> + */ +public class CustomWebappLoader extends WebappLoader { + + /** + * Because this class belongs in tcServer itself, logs go into tcServer's log facility that is separate + * from the web app's log facility. + * + * Logs are directed to catalina.log file. By default only INFO or higher messages are logged. + * To change log level, add the following line to {catalina.base}/conf/logging.properties + * <code>org.apache.hawq.pxf.level = FINE/INFO/WARNING</code> (FINE = debug). + */ + private static final Log LOG = LogFactory.getLog(CustomWebappLoader.class); + + /** + * Classpath files containing path entries, separated by new line. + * Globbing is supported for the file name. + * e.g: + * somedir + * anotherdir/somejar.jar + * anotherone/hadoop*.jar + * anotherone/pxf*[0-9].jar + * Unix wildcard convention can be used to match a number of files + * (e.g. <code>*</code>, <code>[0-9]</code>, <code>?</code>), but not a number of directories. + * + * The files specified under classpathFiles must exist - if they can't be read an exception will be thrown. + */ + private String classpathFiles; + /** + * Secondary classpath files - if these files are unavailable only a warning will be logged. + */ + private String secondaryClasspathFiles; + + /** + * Constructs a WebappLoader with no defined parent class loader (actual parent will be the system class loader). + */ + public CustomWebappLoader() { + super(); + } + + /** + * Constructs a WebappLoader with the specified class loader to be defined as the parent for this ClassLoader. + * + * @param parent The parent class loader + */ + public CustomWebappLoader(ClassLoader parent) { + super(parent); + } + + /** + * <code>classpathFiles</code> attribute is automatically set from the context xml file. + * + * @param classpathFiles Files separated by <code>;</code> Which contains <code>;</code> separated list of path entries. + */ + public void setClasspathFiles(String classpathFiles) { + this.classpathFiles = classpathFiles; + } + + /** + * <code>secondaryClasspathFiles</code> attribute is automatically set from the context xml file. + * + * @param secondaryClasspathFiles Files separated by <code>;</code> Which contains <code>;</code> separated list of path entries. + */ + public void setSecondaryClasspathFiles(String secondaryClasspathFiles) { + this.secondaryClasspathFiles = secondaryClasspathFiles; + } + + /** + * Implements {@link org.apache.catalina.util.LifecycleBase#startInternal()}. + * + * @throws LifecycleException if this component detects a fatal error that prevents this component from being used. + */ + @Override + protected void startInternal() throws LifecycleException { + + addRepositories(classpathFiles, true); + addRepositories(secondaryClasspathFiles, false); + + super.startInternal(); + } + + private void addRepositories(String classpathFiles, boolean throwException) throws LifecycleException { + + for (String classpathFile : classpathFiles.split(";")) { + + String classpath = readClasspathFile(classpathFile, throwException); + if (classpath == null) { + continue; + } + + ArrayList<String> classpathEntries = trimEntries(classpath.split("[\\r\\n]+")); + LOG.info("Classpath file " + classpathFile + " has " + classpathEntries.size() + " entries"); + + for (String entry : classpathEntries) { + LOG.debug("Trying to load entry " + entry); + int repositoriesCount = 0; + Path pathEntry = Paths.get(entry); + /* + * For each entry, we look at the parent directory and try to match each of the files + * inside it to the file name or pattern in the file name (the last part of the path). + * E.g., for path '/some/path/with/pattern*', the parent directory will be '/some/path/with/' + * and the file name will be 'pattern*'. Each file under that directory matching + * this pattern will be added to the class loader repository. + */ + try (DirectoryStream<Path> repositories = Files.newDirectoryStream(pathEntry.getParent(), + pathEntry.getFileName().toString())) { + for (Path repository : repositories) { + if (addPathToRepository(repository, entry)) { + repositoriesCount++; + } + } + } catch (IOException e) { + LOG.warn("Failed to load entry " + entry + ": " + e); + } + if (repositoriesCount == 0) { + LOG.warn("Entry " + entry + " doesn't match any files"); + } + LOG.debug("Loaded " + repositoriesCount + " repositories from entry " + entry); + } + } + } + + private String readClasspathFile(String classpathFile, boolean throwException) throws LifecycleException { + String classpath = null; + try { + LOG.info("Trying to read classpath file " + classpathFile); + classpath = new String(Files.readAllBytes(Paths.get(classpathFile))); + } catch (IOException ioe) { + LOG.warn("Failed to read classpath file: " + ioe); + if (throwException) { + throw new LifecycleException("Failed to read classpath file: " + ioe, ioe); + } + } + return classpath; + } + + /** + * Returns a list of valid classpath entries, excluding null, empty and comment lines. + * @param classpathEntries original entries + * @return valid entries + */ + private ArrayList<String> trimEntries(String[] classpathEntries) { + + ArrayList<String> trimmed = new ArrayList<String>(); + int line = 0; + for (String entry : classpathEntries) { + + line++; + if (entry == null) { + LOG.debug("Skipping entry #" + line + " (null)"); + continue; + } + + entry = entry.trim(); + if (entry.isEmpty() || entry.startsWith("#")) { + LOG.debug("Skipping entry #" + line + " (" + entry + ")"); + continue; + } + trimmed.add(entry); + } + return trimmed; + } + + private boolean addPathToRepository(Path path, String entry) { + + try { + URI pathUri = path.toUri(); + String pathUriStr = pathUri.toString(); + File file = new File(pathUri); + if (!file.canRead()) { + throw new FileNotFoundException(pathUriStr + " cannot be read"); + } + addRepository(pathUriStr); + LOG.debug("Repository " + pathUriStr + " added from entry " + entry); + return true; + } catch (Exception e) { + LOG.warn("Failed to load path " + path + " to repository: " + e); + } + + return false; + } + +} + + http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8c5e6f8b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/utilities/Log4jConfigure.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/utilities/Log4jConfigure.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/utilities/Log4jConfigure.java new file mode 100644 index 0000000..c2ccd20 --- /dev/null +++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/utilities/Log4jConfigure.java @@ -0,0 +1,66 @@ +package org.apache.hawq.pxf.service.utilities; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.File; + +import javax.servlet.ServletContext; +import javax.servlet.ServletContextEvent; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.log4j.PropertyConfigurator; + +public class Log4jConfigure { + + private static final Log LOG = LogFactory.getLog(Log4jConfigure.class); + + /** + * Initializes log4j logging for the webapp. + * + * Reads log4j properties file location from log4jConfigLocation parameter + * in web.xml. When not using aboslute path, the path starts from the webapp + * root directory. If the file can't be read, reverts to default + * configuration file under WEB-INF/classes/pxf-log4j.properties. + * + * @param event Servlet context, used to determine webapp root directory. + */ + public static void configure(ServletContextEvent event) { + + final String defaultLog4jLocation = "WEB-INF/classes/pxf-log4j.properties"; + + ServletContext context = event.getServletContext(); + String log4jConfigLocation = context.getInitParameter("log4jConfigLocation"); + + if (!log4jConfigLocation.startsWith(File.separator)) { + log4jConfigLocation = context.getRealPath("") + File.separator + + log4jConfigLocation; + } + + // revert to default properties file if file doesn't exist + File log4jConfigFile = new File(log4jConfigLocation); + if (!log4jConfigFile.canRead()) { + log4jConfigLocation = context.getRealPath("") + File.separator + + defaultLog4jLocation; + } + PropertyConfigurator.configure(log4jConfigLocation); + LOG.info("log4jConfigLocation = " + log4jConfigLocation); + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8c5e6f8b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/utilities/ProtocolData.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/utilities/ProtocolData.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/utilities/ProtocolData.java new file mode 100644 index 0000000..ec258fa --- /dev/null +++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/utilities/ProtocolData.java @@ -0,0 +1,478 @@ +package org.apache.hawq.pxf.service.utilities; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.security.UserGroupInformation; + +import org.apache.hawq.pxf.api.OutputFormat; +import org.apache.hawq.pxf.api.utilities.ColumnDescriptor; +import org.apache.hawq.pxf.api.utilities.InputData; +import org.apache.hawq.pxf.api.utilities.ProfilesConf; + +/** + * Common configuration of all MetaData classes. Provides read-only access to + * common parameters supplied using system properties. + */ +public class ProtocolData extends InputData { + + private static final String TRUE_LCASE = "true"; + private static final String FALSE_LCASE = "false"; + private static final String PROP_PREFIX = "X-GP-"; + public static final int INVALID_SPLIT_IDX = -1; + + private static final Log LOG = LogFactory.getLog(ProtocolData.class); + + protected OutputFormat outputFormat; + protected int port; + protected String host; + protected String profile; + protected String token; + // statistics parameters + protected int statsMaxFragments; + protected float statsSampleRatio; + + /** + * Constructs a ProtocolData. Parses X-GP-* configuration variables. + * + * @param paramsMap contains all query-specific parameters from Hawq + */ + public ProtocolData(Map<String, String> paramsMap) { + + requestParametersMap = paramsMap; + segmentId = getIntProperty("SEGMENT-ID"); + totalSegments = getIntProperty("SEGMENT-COUNT"); + filterStringValid = getBoolProperty("HAS-FILTER"); + + if (filterStringValid) { + filterString = getProperty("FILTER"); + } + + parseFormat(getProperty("FORMAT")); + + host = getProperty("URL-HOST"); + port = getIntProperty("URL-PORT"); + + tupleDescription = new ArrayList<ColumnDescriptor>(); + recordkeyColumn = null; + parseTupleDescription(); + + /* + * accessor - will throw exception from getPropery() if outputFormat is + * BINARY and the user did not supply accessor=... or profile=... + * resolver - will throw exception from getPropery() if outputFormat is + * BINARY and the user did not supply resolver=... or profile=... + */ + profile = getOptionalProperty("PROFILE"); + if (profile != null) { + setProfilePlugins(); + } + accessor = getProperty("ACCESSOR"); + resolver = getProperty("RESOLVER"); + fragmenter = getOptionalProperty("FRAGMENTER"); + metadata = getOptionalProperty("METADATA"); + dataSource = getProperty("DATA-DIR"); + + /* Kerberos token information */ + if (UserGroupInformation.isSecurityEnabled()) { + token = getProperty("TOKEN"); + } + + parseFragmentMetadata(); + parseUserData(); + parseThreadSafe(); + parseRemoteCredentials(); + + dataFragment = INVALID_SPLIT_IDX; + parseDataFragment(getOptionalProperty("DATA-FRAGMENT")); + + statsMaxFragments = 0; + statsSampleRatio = 0; + parseStatsParameters(); + + // Store alignment for global use as a system property + System.setProperty("greenplum.alignment", getProperty("ALIGNMENT")); + } + + /** + * Constructs an InputDataBuilder from a copy. Used to create from an + * extending class. + * + * @param copy the input data to copy + */ + public ProtocolData(ProtocolData copy) { + this.requestParametersMap = copy.requestParametersMap; + this.segmentId = copy.segmentId; + this.totalSegments = copy.totalSegments; + this.outputFormat = copy.outputFormat; + this.host = copy.host; + this.port = copy.port; + this.fragmentMetadata = copy.fragmentMetadata; + this.userData = copy.userData; + this.tupleDescription = copy.tupleDescription; + this.recordkeyColumn = copy.recordkeyColumn; + this.filterStringValid = copy.filterStringValid; + this.filterString = copy.filterString; + this.dataSource = copy.dataSource; + this.accessor = copy.accessor; + this.resolver = copy.resolver; + this.fragmenter = copy.fragmenter; + this.threadSafe = copy.threadSafe; + this.remoteLogin = copy.remoteLogin; + this.remoteSecret = copy.remoteSecret; + this.token = copy.token; + this.statsMaxFragments = copy.statsMaxFragments; + this.statsSampleRatio = copy.statsSampleRatio; + } + + /** + * Constructs a ProtocolData. Parses X-GP-* configuration variables. + * + * @param paramsMap contains all query-specific parameters from Hawq + * @param profile contains the profile name + */ + public ProtocolData(Map<String, String> paramsMap, String profileString) { + requestParametersMap = paramsMap; + profile = profileString; + setProfilePlugins(); + metadata = getProperty("METADATA"); + + /* Kerberos token information */ + if (UserGroupInformation.isSecurityEnabled()) { + token = getProperty("TOKEN"); + } + } + + /** + * Sets the requested profile plugins from profile file into + * {@link #requestParametersMap}. + */ + private void setProfilePlugins() { + Map<String, String> pluginsMap = ProfilesConf.getProfilePluginsMap(profile); + checkForDuplicates(pluginsMap, requestParametersMap); + requestParametersMap.putAll(pluginsMap); + } + + /** + * Verifies there are no duplicates between parameters declared in the table + * definition and parameters defined in a profile. + * + * The parameters' names are case insensitive. + */ + private void checkForDuplicates(Map<String, String> plugins, + Map<String, String> params) { + List<String> duplicates = new ArrayList<>(); + for (String key : plugins.keySet()) { + if (params.containsKey(key)) { + duplicates.add(key); + } + } + + if (!duplicates.isEmpty()) { + throw new IllegalArgumentException("Profile '" + profile + + "' already defines: " + + String.valueOf(duplicates).replace("X-GP-", "")); + } + } + + /** + * Returns the request parameters. + * + * @return map of request parameters + */ + public Map<String, String> getParametersMap() { + return requestParametersMap; + } + + /** + * Throws an exception when the given property value is missing in request. + * + * @param property missing property name + * @throws IllegalArgumentException throws an exception with the property + * name in the error message + */ + public void protocolViolation(String property) { + String error = "Internal server error. Property \"" + property + + "\" has no value in current request"; + + LOG.error(error); + throw new IllegalArgumentException(error); + } + + /** + * Returns the value to which the specified property is mapped in + * {@link #requestParametersMap}. + * + * @param property the lookup property key + * @throws IllegalArgumentException if property key is missing + */ + private String getProperty(String property) { + String result = requestParametersMap.get(PROP_PREFIX + property); + + if (result == null) { + protocolViolation(property); + } + + return result; + } + + /** + * Returns the optional property value. Unlike {@link #getProperty}, it will + * not fail if the property is not found. It will just return null instead. + * + * @param property the lookup optional property + * @return property value as a String + */ + private String getOptionalProperty(String property) { + return requestParametersMap.get(PROP_PREFIX + property); + } + + /** + * Returns a property value as an int type. + * + * @param property the lookup property + * @return property value as an int type + * @throws NumberFormatException if the value is missing or can't be + * represented by an Integer + */ + private int getIntProperty(String property) { + return Integer.parseInt(getProperty(property)); + } + + /** + * Returns a property value as boolean type. A boolean property is defined + * as an int where 0 means false, and anything else true (like C). + * + * @param property the lookup property + * @return property value as boolean + * @throws NumberFormatException if the value is missing or can't be + * represented by an Integer + */ + private boolean getBoolProperty(String property) { + return getIntProperty(property) != 0; + } + + /** + * Returns the current output format, either {@link OutputFormat#TEXT} or + * {@link OutputFormat#BINARY}. + * + * @return output format + */ + public OutputFormat outputFormat() { + return outputFormat; + } + + /** + * Returns the server name providing the service. + * + * @return server name + */ + public String serverName() { + return host; + } + + /** + * Returns the server port providing the service. + * + * @return server port + */ + public int serverPort() { + return port; + } + + /** + * Returns Kerberos token information. + * + * @return token + */ + public String getToken() { + return token; + } + + /** + * Statistics parameter. Returns the max number of fragments to return for + * ANALYZE sampling. The value is set in HAWQ side using the GUC + * pxf_stats_max_fragments. + * + * @return max number of fragments to be processed by analyze + */ + public int getStatsMaxFragments() { + return statsMaxFragments; + } + + /** + * Statistics parameter. Returns a number between 0.0001 and 1.0, + * representing the sampling ratio on each fragment for ANALYZE sampling. + * The value is set in HAWQ side based on ANALYZE computations and the + * number of sampled fragments. + * + * @return sampling ratio + */ + public float getStatsSampleRatio() { + return statsSampleRatio; + } + + /** + * Sets the thread safe parameter. Default value - true. + */ + private void parseThreadSafe() { + + threadSafe = true; + String threadSafeStr = getOptionalProperty("THREAD-SAFE"); + if (threadSafeStr != null) { + threadSafe = parseBooleanValue(threadSafeStr); + } + } + + private boolean parseBooleanValue(String threadSafeStr) { + + if (threadSafeStr.equalsIgnoreCase(TRUE_LCASE)) { + return true; + } + if (threadSafeStr.equalsIgnoreCase(FALSE_LCASE)) { + return false; + } + throw new IllegalArgumentException("Illegal boolean value '" + + threadSafeStr + "'." + " Usage: [TRUE|FALSE]"); + } + + /** + * Sets the format type based on the input string. Allowed values are: + * "TEXT", "GPDBWritable". + * + * @param formatString format string + */ + protected void parseFormat(String formatString) { + switch (formatString) { + case "TEXT": + outputFormat = OutputFormat.TEXT; + break; + case "GPDBWritable": + outputFormat = OutputFormat.BINARY; + break; + default: + throw new IllegalArgumentException( + "Wrong value for greenplum.format " + formatString); + } + } + + /* + * Sets the tuple description for the record + */ + void parseTupleDescription() { + int columns = getIntProperty("ATTRS"); + for (int i = 0; i < columns; ++i) { + String columnName = getProperty("ATTR-NAME" + i); + int columnTypeCode = getIntProperty("ATTR-TYPECODE" + i); + String columnTypeName = getProperty("ATTR-TYPENAME" + i); + + ColumnDescriptor column = new ColumnDescriptor(columnName, + columnTypeCode, i, columnTypeName); + tupleDescription.add(column); + + if (columnName.equalsIgnoreCase(ColumnDescriptor.RECORD_KEY_NAME)) { + recordkeyColumn = column; + } + } + } + + /** + * Sets the index of the allocated data fragment + * + * @param fragment the allocated data fragment + */ + protected void parseDataFragment(String fragment) { + + /* + * Some resources don't require a fragment, hence the list can be empty. + */ + if (StringUtils.isEmpty(fragment)) { + return; + } + dataFragment = Integer.parseInt(fragment); + } + + private void parseFragmentMetadata() { + fragmentMetadata = parseBase64("FRAGMENT-METADATA", + "Fragment metadata information"); + } + + private void parseUserData() { + userData = parseBase64("FRAGMENT-USER-DATA", "Fragment user data"); + } + + private byte[] parseBase64(String key, String errName) { + String encoded = getOptionalProperty(key); + if (encoded == null) { + return null; + } + if (!Base64.isArrayByteBase64(encoded.getBytes())) { + throw new IllegalArgumentException(errName + + " must be Base64 encoded." + "(Bad value: " + encoded + + ")"); + } + byte[] parsed = Base64.decodeBase64(encoded); + LOG.debug("decoded " + key + ": " + new String(parsed)); + return parsed; + } + + private void parseRemoteCredentials() { + remoteLogin = getOptionalProperty("REMOTE-USER"); + remoteSecret = getOptionalProperty("REMOTE-PASS"); + } + + private void parseStatsParameters() { + + String maxFrags = getOptionalProperty("STATS-MAX-FRAGMENTS"); + if (!StringUtils.isEmpty(maxFrags)) { + statsMaxFragments = Integer.parseInt(maxFrags); + if (statsMaxFragments <= 0) { + throw new IllegalArgumentException("Wrong value '" + + statsMaxFragments + "'. " + + "STATS-MAX-FRAGMENTS must be a positive integer"); + } + } + + String sampleRatioStr = getUserProperty("STATS-SAMPLE-RATIO"); + if (!StringUtils.isEmpty(sampleRatioStr)) { + statsSampleRatio = Float.parseFloat(sampleRatioStr); + if (statsSampleRatio < 0.0001 || statsSampleRatio > 1.0) { + throw new IllegalArgumentException( + "Wrong value '" + + statsSampleRatio + + "'. " + + "STATS-SAMPLE-RATIO must be a value between 0.0001 and 1.0"); + } + } + + if ((statsSampleRatio > 0) != (statsMaxFragments > 0)) { + throw new IllegalArgumentException( + "Missing parameter: STATS-SAMPLE-RATIO and STATS-MAX-FRAGMENTS must be set together"); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8c5e6f8b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/utilities/SecureLogin.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/utilities/SecureLogin.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/utilities/SecureLogin.java new file mode 100644 index 0000000..6ce05ed --- /dev/null +++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/utilities/SecureLogin.java @@ -0,0 +1,61 @@ +package org.apache.hawq.pxf.service.utilities; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.SecurityUtil; + +/** + * This class relies heavily on Hadoop API to + * <ul> + * <li>Check need for secure login in Hadoop</li> + * <li>Parse and load .xml configuration file</li> + * <li>Do a Kerberos login with a kaytab file</li> + * <li>convert _HOST in Kerberos principal to current hostname</li> + * </ul> + * + * It uses Hadoop Configuration to parse XML configuration files.<br> + * It uses Hadoop Security to modify principal and perform the login. + * + * The major limitation in this class is its dependency on Hadoop. If Hadoop + * security is off, no login will be performed regardless of connector being + * used. + */ +public class SecureLogin { + private static final Log LOG = LogFactory.getLog(SecureLogin.class); + private static final String CONFIG_KEY_SERVICE_KEYTAB = "pxf.service.kerberos.keytab"; + private static final String CONFIG_KEY_SERVICE_PRINCIPAL = "pxf.service.kerberos.principal"; + + public static void login() { + try { + Configuration config = new Configuration(); + config.addResource("pxf-site.xml"); + + SecurityUtil.login(config, CONFIG_KEY_SERVICE_KEYTAB, + CONFIG_KEY_SERVICE_PRINCIPAL); + } catch (Exception e) { + LOG.error("PXF service login failed"); + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8c5e6f8b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/utilities/SecuredHDFS.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/utilities/SecuredHDFS.java b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/utilities/SecuredHDFS.java new file mode 100644 index 0000000..f442a6d --- /dev/null +++ b/pxf/pxf-service/tmp/generatedSources/org/apache/hawq/pxf/service/utilities/SecuredHDFS.java @@ -0,0 +1,114 @@ +package org.apache.hawq.pxf.service.utilities; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.namenode.NameNodeHttpServer; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; + +import javax.servlet.ServletContext; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; + +/** + * The class handles security functions for handling secured HDFS + */ +public class SecuredHDFS { + private static final Log LOG = LogFactory.getLog(SecuredHDFS.class); + + /** + * The function will get the token information from parameters and call + * SecuredHDFS to verify the token. + * + * All token properties will be deserialized from string to a Token object + * + * @param protData input parameters + * @param context servlet context which contains the NN address + * + * @throws SecurityException Thrown when authentication fails + */ + public static void verifyToken(ProtocolData protData, ServletContext context) { + try { + if (UserGroupInformation.isSecurityEnabled()) { + Token<DelegationTokenIdentifier> token = new Token<DelegationTokenIdentifier>(); + String tokenString = protData.getToken(); + token.decodeFromUrlString(tokenString); + + verifyToken(token.getIdentifier(), token.getPassword(), + token.getKind(), token.getService(), context); + } + } catch (IOException e) { + throw new SecurityException("Failed to verify delegation token " + + e, e); + } + } + + /** + * The function will verify the token with NameNode if available and will + * create a UserGroupInformation. + * + * Code in this function is copied from JspHelper.getTokenUGI + * + * @param identifier Delegation token identifier + * @param password Delegation token password + * @param kind the kind of token + * @param service the service for this token + * @param servletContext Jetty servlet context which contains the NN address + * + * @throws SecurityException Thrown when authentication fails + */ + private static void verifyToken(byte[] identifier, byte[] password, + Text kind, Text service, + ServletContext servletContext) { + try { + Token<DelegationTokenIdentifier> token = new Token<DelegationTokenIdentifier>( + identifier, password, kind, service); + + ByteArrayInputStream buf = new ByteArrayInputStream( + token.getIdentifier()); + DataInputStream in = new DataInputStream(buf); + DelegationTokenIdentifier id = new DelegationTokenIdentifier(); + id.readFields(in); + + final NameNode nn = NameNodeHttpServer.getNameNodeFromContext(servletContext); + if (nn != null) { + nn.getNamesystem().verifyToken(id, token.getPassword()); + } + + UserGroupInformation userGroupInformation = id.getUser(); + userGroupInformation.addToken(token); + LOG.debug("user " + userGroupInformation.getUserName() + " (" + + userGroupInformation.getShortUserName() + + ") authenticated"); + + // re-login if necessary + userGroupInformation.checkTGTAndReloginFromKeytab(); + } catch (IOException e) { + throw new SecurityException("Failed to verify delegation token " + + e, e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8c5e6f8b/src/backend/catalog/external/externalmd.c ---------------------------------------------------------------------- diff --git a/src/backend/catalog/external/externalmd.c b/src/backend/catalog/external/externalmd.c index e65d741..926605f 100644 --- a/src/backend/catalog/external/externalmd.c +++ b/src/backend/catalog/external/externalmd.c @@ -125,6 +125,10 @@ static PxfItem *ParsePxfItem(struct json_object *pxfMD, char* profile) struct json_object *fieldType = json_object_object_get(jsonCol, "type"); pxfField->type = pstrdup(json_object_get_string(fieldType)); + + struct json_object *sourceFieldType = json_object_object_get(jsonCol, "sourceType"); + pxfField->sourceType = pstrdup(json_object_get_string(sourceFieldType)); + pxfField->nTypeModifiers = 0; elog(DEBUG1, "Parsing field %s, type %s", pxfField->name, pxfField->type); http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8c5e6f8b/src/backend/utils/adt/pxf_functions.c ---------------------------------------------------------------------- diff --git a/src/backend/utils/adt/pxf_functions.c b/src/backend/utils/adt/pxf_functions.c index ee19a8b..806565a 100644 --- a/src/backend/utils/adt/pxf_functions.c +++ b/src/backend/utils/adt/pxf_functions.c @@ -86,8 +86,8 @@ Datum pxf_get_item_fields(PG_FUNCTION_ARGS) FuncCallContext *funcctx; HeapTuple tuple; Datum result; - Datum values[4]; - bool nulls[4]; + Datum values[5]; + bool nulls[5]; ItemContext *item_context; @@ -126,7 +126,7 @@ Datum pxf_get_item_fields(PG_FUNCTION_ARGS) * build tupdesc for result tuples. This must match this function's * pg_proc entry! */ - tupdesc = CreateTemplateTupleDesc(4, false); + tupdesc = CreateTemplateTupleDesc(5, false); TupleDescInitEntry(tupdesc, (AttrNumber) 1, "path", TEXTOID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber) 2, "itemname", @@ -135,6 +135,8 @@ Datum pxf_get_item_fields(PG_FUNCTION_ARGS) TEXTOID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber) 4, "fieldtype", TEXTOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 5, "sourcefieldtype", + TEXTOID, -1, 0); funcctx->tuple_desc = BlessTupleDesc(tupdesc); MemoryContextSwitchTo(oldcontext); @@ -169,6 +171,7 @@ Datum pxf_get_item_fields(PG_FUNCTION_ARGS) values[1] = CStringGetTextDatum(item->name); values[2] = CStringGetTextDatum(field->name); values[3] = CStringGetTextDatum(field->type); + values[4] = CStringGetTextDatum(field->sourceType); tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls); result = HeapTupleGetDatum(tuple); http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8c5e6f8b/src/bin/psql/describe.c ---------------------------------------------------------------------- diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index f1de41b..ab2aa8b 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -4263,8 +4263,13 @@ describePxfTable(const char *profile, const char *pattern, bool verbose) printQueryOpt myopt = pset.popt; printTableContent cont; int cols = 0; + if (verbose) + { + cols = 3; + } else + cols = 2; int total_numrows = 0; - char *headers[2]; + char *headers[cols]; bool printTableInitialized = false; char *previous_path = NULL; @@ -4274,11 +4279,15 @@ describePxfTable(const char *profile, const char *pattern, bool verbose) char *itemname; char *fieldname; char *fieldtype; + char *sourcefieldtype; int total_fields = 0; //needed to know how much memory allocate for current table initPQExpBuffer(&buf); - printfPQExpBuffer(&buf, "SELECT t.*, COUNT() OVER(PARTITION BY path, itemname) as total_fields FROM\n" + printfPQExpBuffer(&buf, "SELECT t.path, t.itemname, t.fieldname, t.fieldtype,"); + if (verbose) + appendPQExpBuffer(&buf, " sourcefieldtype, "); + appendPQExpBuffer(&buf,"COUNT() OVER(PARTITION BY path, itemname) as total_fields FROM\n" "pxf_get_item_fields('%s', '%s') t\n", profile, pattern); res = PSQLexec(buf.data, false); @@ -4294,7 +4303,9 @@ describePxfTable(const char *profile, const char *pattern, bool verbose) /* Header */ headers[0] = gettext_noop("Column"); headers[1] = gettext_noop("Type"); - cols = 2; + if (verbose) + headers[2] = gettext_noop("Source type"); + for (int i = 0; i < total_numrows; i++) { @@ -4303,7 +4314,14 @@ describePxfTable(const char *profile, const char *pattern, bool verbose) itemname = PQgetvalue(res, i, 1); fieldname = PQgetvalue(res, i, 2); fieldtype = PQgetvalue(res, i, 3); - total_fields = PQgetvalue(res, i, 4); + if (verbose) + { + sourcefieldtype = PQgetvalue(res, i, 4); + total_fields = PQgetvalue(res, i, 5); + } else + { + total_fields = PQgetvalue(res, i, 4); + } /* First row for current table */ if (previous_itemname == NULL @@ -4340,6 +4358,12 @@ describePxfTable(const char *profile, const char *pattern, bool verbose) /* Type */ printTableAddCell(&cont, fieldtype, false, false); + if (verbose) + { + /*Source type */ + printTableAddCell(&cont, sourcefieldtype, false, false); + } + previous_path = path; previous_itemname = itemname; http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8c5e6f8b/src/include/catalog/external/itemmd.h ---------------------------------------------------------------------- diff --git a/src/include/catalog/external/itemmd.h b/src/include/catalog/external/itemmd.h index a841d63..e6dad63 100644 --- a/src/include/catalog/external/itemmd.h +++ b/src/include/catalog/external/itemmd.h @@ -41,6 +41,9 @@ typedef struct PxfField /* type name */ char *type; + /*source type name */ + char *sourceType; + /* type modifiers, e.g. max length or precision */ int typeModifiers[2]; http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8c5e6f8b/src/include/catalog/pg_proc.h ---------------------------------------------------------------------- diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index f3c5e77..e818909 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -10129,8 +10129,8 @@ DESCR("bitmap(internal)"); DATA(insert OID = 3011 ( bmoptions PGNSP PGUID 12 f f t f s 2 17 f "1009 16" _null_ _null_ _null_ bmoptions - _null_ n )); DESCR("btree(internal)"); -/* pxf_get_item_fields(text, text, OUT text, OUT text, OUT text, OUT text) => SETOF pg_catalog.record */ -DATA(insert OID = 9996 ( pxf_get_item_fields PGNSP PGUID 12 f f t t v 2 2249 f "25 25" "{25,25,25,25,25,25}" "{i,i,o,o,o,o}" "{profile,pattern,path,itemname,fieldname,fieldtype}" pxf_get_item_fields - _null_ r )); +/* pxf_get_item_fields(text, text, OUT text, OUT text, OUT text, OUT text, OUT text) => SETOF pg_catalog.record */ +DATA(insert OID = 9996 ( pxf_get_item_fields PGNSP PGUID 12 f f t t v 2 2249 f "25 25" "{25,25,25,25,25,25,25}" "{i,i,o,o,o,o,o}" "{profile,pattern,path,itemname,fieldname,fieldtype,sourcefieldtype}" pxf_get_item_fields - _null_ r )); DESCR("Returns the metadata fields of external object from PXF"); /* raises deprecation error */ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/8c5e6f8b/src/include/catalog/pg_proc.sql ---------------------------------------------------------------------- diff --git a/src/include/catalog/pg_proc.sql b/src/include/catalog/pg_proc.sql index fc475e2..987b802 100644 --- a/src/include/catalog/pg_proc.sql +++ b/src/include/catalog/pg_proc.sql @@ -5348,7 +5348,7 @@ CREATE FUNCTION bmoptions(_text, bool) RETURNS bytea LANGUAGE internal STABLE STRICT AS 'bmoptions' WITH (OID=3011, DESCRIPTION="btree(internal)"); - CREATE FUNCTION pxf_get_item_fields(IN profile text, IN pattern text, OUT path text, OUT itemname text, OUT fieldname text, OUT fieldtype text) RETURNS SETOF pg_catalog.record LANGUAGE internal VOLATILE STRICT AS 'pxf_get_object_fields' WITH (OID=9996, DESCRIPTION="Returns the metadata fields of external object from PXF"); + CREATE FUNCTION pxf_get_item_fields(IN profile text, IN pattern text, OUT path text, OUT itemname text, OUT fieldname text, OUT fieldtype text, OUT sourcefieldtype text) RETURNS SETOF pg_catalog.record LANGUAGE internal VOLATILE STRICT AS 'pxf_get_object_fields' WITH (OID=9996, DESCRIPTION="Returns the metadata fields of external object from PXF"); -- raises deprecation error CREATE FUNCTION gp_deprecated() RETURNS void LANGUAGE internal IMMUTABLE AS 'gp_deprecated' WITH (OID=9997, DESCRIPTION="raises function deprecation error");