This is an automated email from the ASF dual-hosted git repository. djwang pushed a commit to branch merge-with-upstream in repository https://gitbox.apache.org/repos/asf/cloudberry-pxf.git
commit a2fe875bbc5813c899c26ce7ce9339431fc9caff Author: Ashuka Xue <[email protected]> AuthorDate: Wed Sep 7 12:40:15 2022 -0700 Add JsonProtocolHandler to use HdfsFileFragmenter for multi-line JSON (#858) This commit fixes an issue that could occur with multi-line JSON files. Previously, it was possible for a fragment to improperly parse a JSON object if the split started in the middle of a string, causing wrong results. This commit now uses the HdfsFileFragmenter for multi-line JSON files. --- .../pxf/plugins/json/JsonProtocolHandler.java | 13 ++----- .../pxf/plugins/json/JsonProtocolHandlerTest.java | 17 --------- .../pxf/plugins/s3/S3ProtocolHandler.java | 6 +++- .../pxf/plugins/s3/S3ProtocolHandlerTest.java | 42 +++------------------- .../src/main/resources/pxf-profiles-default.xml | 2 +- 5 files changed, 13 insertions(+), 67 deletions(-) diff --git a/server/pxf-json/src/main/java/org/greenplum/pxf/plugins/json/JsonProtocolHandler.java b/server/pxf-json/src/main/java/org/greenplum/pxf/plugins/json/JsonProtocolHandler.java index 0bf61661..9673212e 100644 --- a/server/pxf-json/src/main/java/org/greenplum/pxf/plugins/json/JsonProtocolHandler.java +++ b/server/pxf-json/src/main/java/org/greenplum/pxf/plugins/json/JsonProtocolHandler.java @@ -18,7 +18,7 @@ public class JsonProtocolHandler implements ProtocolHandler { @Override public String getFragmenterClassName(RequestContext context) { String fragmenter = context.getFragmenter(); // default to fragmenter defined by the profile - if (splitByFile(context)) { + if (useMultilineJson(context)) { fragmenter = HCFS_FILE_FRAGMENTER; } LOG.debug("Determined to use {} fragmenter", fragmenter); @@ -35,14 +35,7 @@ public class JsonProtocolHandler implements ProtocolHandler { return context.getResolver(); } - /** - * Determine if the HdfsFileFragmenter should be used instead of the default fragmenter. - * This determination is dictated by the SPLIT_BY_FILE parameter which is provided in the LOCATION uri. - * - * @param context the request context - * @return true if the HdfsFileFragmenter should be used, false otherwise - */ - public boolean splitByFile(RequestContext context) { - return context.getOption("split_by_file", false); + public boolean useMultilineJson(RequestContext context) { + return isNotEmpty(context.getOption("identifier")); } } diff --git a/server/pxf-json/src/test/java/org/greenplum/pxf/plugins/json/JsonProtocolHandlerTest.java b/server/pxf-json/src/test/java/org/greenplum/pxf/plugins/json/JsonProtocolHandlerTest.java index 9717fc67..b90ab9c4 100644 --- a/server/pxf-json/src/test/java/org/greenplum/pxf/plugins/json/JsonProtocolHandlerTest.java +++ b/server/pxf-json/src/test/java/org/greenplum/pxf/plugins/json/JsonProtocolHandlerTest.java @@ -50,23 +50,6 @@ public class JsonProtocolHandlerTest { @Test public void testWithIdentifier() { context.addOption("IDENTIFIER", "c1"); - assertEquals(DEFAULT_FRAGMENTER, handler.getFragmenterClassName(context)); - assertEquals(DEFAULT_ACCESSOR, handler.getAccessorClassName(context)); - assertEquals(DEFAULT_RESOLVER, handler.getResolverClassName(context)); - } - - @Test - public void testWithIdentifierUseFileFragmenter() { - context.addOption("IDENTIFIER", "c1"); - context.addOption("SPLIT_BY_FILE", "true"); - assertEquals(FILE_FRAGMENTER, handler.getFragmenterClassName(context)); - assertEquals(DEFAULT_ACCESSOR, handler.getAccessorClassName(context)); - assertEquals(DEFAULT_RESOLVER, handler.getResolverClassName(context)); - } - - @Test - public void testUseFileFragmenter() { - context.addOption("SPLIT_BY_FILE", "true"); assertEquals(FILE_FRAGMENTER, handler.getFragmenterClassName(context)); assertEquals(DEFAULT_ACCESSOR, handler.getAccessorClassName(context)); assertEquals(DEFAULT_RESOLVER, handler.getResolverClassName(context)); diff --git a/server/pxf-s3/src/main/java/org/greenplum/pxf/plugins/s3/S3ProtocolHandler.java b/server/pxf-s3/src/main/java/org/greenplum/pxf/plugins/s3/S3ProtocolHandler.java index bcd6f67c..62617cb1 100644 --- a/server/pxf-s3/src/main/java/org/greenplum/pxf/plugins/s3/S3ProtocolHandler.java +++ b/server/pxf-s3/src/main/java/org/greenplum/pxf/plugins/s3/S3ProtocolHandler.java @@ -55,7 +55,7 @@ public class S3ProtocolHandler implements ProtocolHandler { @Override public String getFragmenterClassName(RequestContext context) { String fragmenter = context.getFragmenter(); // default to fragmenter defined by the profile - if (useS3Select(context) || useFileReadForJson(context)) { + if (useS3Select(context) || useMultilineJson(context)) { fragmenter = HCFS_FILE_FRAGMENTER; } @@ -129,6 +129,10 @@ public class S3ProtocolHandler implements ProtocolHandler { } } + public boolean useMultilineJson(RequestContext context) { + return isNotEmpty(context.getOption("identifier")); + } + /** * Determine if the HdfsFileFragmenter should be used for JSON filetypes. * This determination is dictated by the SPLIT_BY_FILE parameter which is provided in the LOCATION uri. diff --git a/server/pxf-s3/src/test/java/org/greenplum/pxf/plugins/s3/S3ProtocolHandlerTest.java b/server/pxf-s3/src/test/java/org/greenplum/pxf/plugins/s3/S3ProtocolHandlerTest.java index 3c844a4e..4456afa3 100644 --- a/server/pxf-s3/src/test/java/org/greenplum/pxf/plugins/s3/S3ProtocolHandlerTest.java +++ b/server/pxf-s3/src/test/java/org/greenplum/pxf/plugins/s3/S3ProtocolHandlerTest.java @@ -60,6 +60,8 @@ public class S3ProtocolHandlerTest { private static final String[] EXPECTED_RESOLVER_TEXT_AUTO_NO_BENEFIT_HAS_HEADER = {DEFAULT_RESOLVER, STRING_PASS_RESOLVER, STRING_PASS_RESOLVER, STRING_PASS_RESOLVER, DEFAULT_RESOLVER}; private static final String[] EXPECTED_FRAGMENTER_TEXT_AUTO_NO_BENEFIT_HAS_HEADER = {DEFAULT_FRAGMENTER, FILE_FRAGMENTER, FILE_FRAGMENTER, FILE_FRAGMENTER, DEFAULT_FRAGMENTER}; + private static final String[] EXPECTED_FRAGMENTER_MULTILINE = {FILE_FRAGMENTER, FILE_FRAGMENTER, FILE_FRAGMENTER, FILE_FRAGMENTER, FILE_FRAGMENTER}; + private S3ProtocolHandler handler; private RequestContext context; @@ -422,19 +424,7 @@ public class S3ProtocolHandlerTest { context.setOutputFormat(OutputFormat.TEXT); verifyAccessors(context, EXPECTED_ACCESSOR_TEXT_OFF); verifyResolvers(context, EXPECTED_RESOLVER_TEXT_OFF); - verifyFragmenters(context, EXPECTED_FRAGMENTER_TEXT_OFF); - } - - @Test - public void testTextSplitByFileAndSelectOff() { - context.addOption("S3_SELECT", "off"); - context.addOption("SPLIT_BY_FILE", "true"); - context.setOutputFormat(OutputFormat.TEXT); - verifyAccessors(context, EXPECTED_ACCESSOR_TEXT_OFF); - verifyResolvers(context, EXPECTED_RESOLVER_TEXT_OFF); - String[] EXPECTED_FRAGMENTERS = EXPECTED_FRAGMENTER_TEXT_OFF.clone(); - EXPECTED_FRAGMENTERS[3] = FILE_FRAGMENTER; // index 3 is json - verifyFragmenters(context, EXPECTED_FRAGMENTERS); + verifyFragmenters(context, EXPECTED_FRAGMENTER_MULTILINE); } @Test @@ -448,18 +438,6 @@ public class S3ProtocolHandlerTest { verifyFragmenters(context, EXPECTED_FRAGMENTER_TEXT_ON); } - @Test - public void testSplitByFileAndSelectOn() { - // s3 options should override multiline json fragmenter option - context.addOption("S3_SELECT", "on"); - context.addOption("SPLIT_BY_FILE", "true"); - context.setOutputFormat(OutputFormat.TEXT); - verifyAccessors(context, EXPECTED_ACCESSOR_TEXT_ON); - verifyResolvers(context, EXPECTED_RESOLVER_TEXT_ON); - String[] EXPECTED_FRAGMENTERS = EXPECTED_FRAGMENTER_TEXT_ON.clone(); - EXPECTED_FRAGMENTERS[3] = FILE_FRAGMENTER; // index 3 is json - verifyFragmenters(context, EXPECTED_FRAGMENTERS); - } @Test public void testTextIdentifierAndSelectAuto() { // s3 options should override multiline json fragmenter option @@ -468,19 +446,7 @@ public class S3ProtocolHandlerTest { context.setOutputFormat(OutputFormat.TEXT); verifyAccessors(context, EXPECTED_ACCESSOR_TEXT_AUTO_NO_BENEFIT); verifyResolvers(context, EXPECTED_RESOLVER_TEXT_AUTO_NO_BENEFIT); - verifyFragmenters(context, EXPECTED_FRAGMENTER_TEXT_AUTO_NO_BENEFIT); - } - - @Test - public void testTextSplitByFileAndSelectAuto() { - context.addOption("S3_SELECT", "auto"); - context.addOption("SPLIT_BY_FILE", "true"); - context.setOutputFormat(OutputFormat.TEXT); - verifyAccessors(context, EXPECTED_ACCESSOR_TEXT_AUTO_NO_BENEFIT); - verifyResolvers(context, EXPECTED_RESOLVER_TEXT_AUTO_NO_BENEFIT); - String[] EXPECTED_FRAGMENTERS = EXPECTED_FRAGMENTER_TEXT_AUTO_NO_BENEFIT.clone(); - EXPECTED_FRAGMENTERS[3] = FILE_FRAGMENTER; // index 3 is json - verifyFragmenters(context, EXPECTED_FRAGMENTERS); + verifyFragmenters(context, EXPECTED_FRAGMENTER_MULTILINE); } private void verifyFragmenters(RequestContext context, String[] expected) { diff --git a/server/pxf-service/src/main/resources/pxf-profiles-default.xml b/server/pxf-service/src/main/resources/pxf-profiles-default.xml index 7f80649e..423c2b62 100644 --- a/server/pxf-service/src/main/resources/pxf-profiles-default.xml +++ b/server/pxf-service/src/main/resources/pxf-profiles-default.xml @@ -836,7 +836,7 @@ under the License. <resolver>org.greenplum.pxf.plugins.json.JsonResolver</resolver> </plugins> <handler>org.greenplum.pxf.plugins.json.JsonProtocolHandler</handler> - <protocol>abfss</protocol> + <protocol>adl</protocol> </profile> <profile> <name>wasbs:json</name> --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
