OOZIE-2777 Config-default.xml longer than 64k results in java.io.UTFDataFormatException (gezapeti via harsh)
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/a9141968 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/a9141968 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/a9141968 Branch: refs/heads/oya Commit: a914196882e845b9908f041c5b0d8712e4eada6b Parents: 0299e5d Author: Harsh J <ha...@cloudera.com> Authored: Thu Feb 2 16:23:08 2017 +0530 Committer: Harsh J <ha...@cloudera.com> Committed: Thu Feb 2 16:23:08 2017 +0530 ---------------------------------------------------------------------- .../oozie/util/StringSerializationUtil.java | 72 +++++++++++++++++ .../oozie/workflow/lite/LiteWorkflowApp.java | 61 +++++---------- .../workflow/lite/LiteWorkflowInstance.java | 33 +------- .../org/apache/oozie/workflow/lite/NodeDef.java | 21 +++-- .../oozie/command/wf/TestSubmitXCommand.java | 47 ++++++++++- .../oozie/util/TestStringSerializationUtil.java | 53 +++++++++++++ .../workflow/lite/TestLiteWorkflowApp.java | 78 +++++++++++++++++++ .../test/resources/oldWorkFlowApp.serialized | Bin 0 -> 68945 bytes release-log.txt | 1 + 9 files changed, 288 insertions(+), 78 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/a9141968/core/src/main/java/org/apache/oozie/util/StringSerializationUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/util/StringSerializationUtil.java b/core/src/main/java/org/apache/oozie/util/StringSerializationUtil.java new file mode 100644 index 0000000..6f930c0 --- /dev/null +++ b/core/src/main/java/org/apache/oozie/util/StringSerializationUtil.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.util; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +/** + * This class provides a workaround for the 64k limit for string in DataOutput. + */ +public final class StringSerializationUtil { + // Using unique string to indicate version. This is to make sure that it + // doesn't match with user data. + private static final String DATA_VERSION = "V==1"; + private static final int CONVERSION_TRESHOLD = 60000; + + private StringSerializationUtil() { + } + + /** + * Writes out value to dOut. Converts it to byte array if the length of the UTF-8 byte array representation of the + * string is longer than 60k bytes. + * + * @param dOut + * @param value + * @throws IOException + */ + public static void writeString(DataOutput dOut, String value) throws IOException { + if (value == null) { + dOut.writeUTF(value); + return; + } + + byte[] data = value.getBytes(StandardCharsets.UTF_8.name()); + if (data.length > CONVERSION_TRESHOLD) { + dOut.writeUTF(DATA_VERSION); + dOut.writeInt(data.length); + dOut.write(data); + } else { + dOut.writeUTF(value); + } + } + + public static String readString(DataInput dIn) throws IOException { + String value = dIn.readUTF(); + if (DATA_VERSION.equals(value)) { + int length = dIn.readInt(); + byte[] data = new byte[length]; + dIn.readFully(data); + value = new String(data, StandardCharsets.UTF_8.name()); + } + return value; + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/a9141968/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowApp.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowApp.java b/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowApp.java index a06595d..5e6dc7a 100644 --- a/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowApp.java +++ b/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowApp.java @@ -20,24 +20,26 @@ package org.apache.oozie.workflow.lite; import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.oozie.util.StringSerializationUtil; import org.apache.oozie.workflow.WorkflowApp; import org.apache.oozie.workflow.WorkflowException; import org.apache.oozie.util.ParamChecker; -import org.apache.oozie.util.XLog; import org.apache.oozie.ErrorCode; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.LinkedHashMap; -import java.util.List; import java.util.Map; //TODO javadoc public class LiteWorkflowApp implements Writable, WorkflowApp { + /** + * Serialization of strings longer than 65k changed. This flag marks which method to use during reading. + */ + public static final int NEW_SERIALIZATION_METHOD_FLAG = -1; private String name; private String definition; private Map<String, NodeDef> nodesMap = new LinkedHashMap<String, NodeDef>(); @@ -111,13 +113,9 @@ public class LiteWorkflowApp implements Writable, WorkflowApp { @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(name); - //dataOutput.writeUTF(definition); - //writeUTF() has limit 65535, so split long string to multiple short strings - List<String> defList = divideStr(definition); - dataOutput.writeInt(defList.size()); - for (String d : defList) { - dataOutput.writeUTF(d); - } + // write out -1 as a marker to use StringSerializationUtil. Previously it was split to 20k long bits in a list. + dataOutput.writeInt(NEW_SERIALIZATION_METHOD_FLAG); + StringSerializationUtil.writeString(dataOutput, definition); dataOutput.writeInt(nodesMap.size()); for (NodeDef n : getNodeDefs()) { dataOutput.writeUTF(n.getClass().getName()); @@ -125,43 +123,22 @@ public class LiteWorkflowApp implements Writable, WorkflowApp { } } - /** - * To split long string to a list of smaller strings. - * - * @param str - * @return List - */ - private List<String> divideStr(String str) { - List<String> list = new ArrayList<String>(); - int len = 20000; - int strlen = str.length(); - int start = 0; - int end = len; - - while (end < strlen) { - list.add(str.substring(start, end)); - start = end; - end += len; - } - - if (strlen <= end) { - list.add(str.substring(start, strlen)); - } - return list; - } - @Override public void readFields(DataInput dataInput) throws IOException { name = dataInput.readUTF(); - //definition = dataInput.readUTF(); //read the full definition back - int defListSize = dataInput.readInt(); - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < defListSize; i++) { - sb.append(dataInput.readUTF()); + int definitionListFlag = dataInput.readInt(); + if(definitionListFlag > NEW_SERIALIZATION_METHOD_FLAG) { + // negative number marking the usage of StringSerializationUtil + // positive number is the length of the array the String was broken into. + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < definitionListFlag; i++) { + sb.append(dataInput.readUTF()); + } + definition = sb.toString(); + } else { + definition = StringSerializationUtil.readString(dataInput); } - definition = sb.toString(); - int numNodes = dataInput.readInt(); for (int x = 0; x < numNodes; x++) { try { http://git-wip-us.apache.org/repos/asf/oozie/blob/a9141968/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowInstance.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowInstance.java b/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowInstance.java index 2b13e67..560353c 100644 --- a/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowInstance.java +++ b/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowInstance.java @@ -25,6 +25,7 @@ import org.apache.oozie.ErrorCode; import org.apache.oozie.client.OozieClient; import org.apache.oozie.service.DagXLogInfoService; import org.apache.oozie.service.XLogService; +import org.apache.oozie.util.StringSerializationUtil; import org.apache.oozie.util.ParamChecker; import org.apache.oozie.util.XConfiguration; import org.apache.oozie.util.XLog; @@ -52,10 +53,6 @@ public class LiteWorkflowInstance implements Writable, WorkflowInstance { private static String ROOT = PATH_SEPARATOR; private static String TRANSITION_SEPARATOR = "#"; - // Using unique string to indicate version. This is to make sure that it - // doesn't match with user data. - private static final String DATA_VERSION = "V==1"; - private static class NodeInstance { String nodeName; boolean started = false; @@ -556,6 +553,7 @@ public class LiteWorkflowInstance implements Writable, WorkflowInstance { @Override public void write(DataOutput dOut) throws IOException { + dOut.writeUTF(instanceId); //Hadoop Configuration has to get its act right @@ -577,7 +575,7 @@ public class LiteWorkflowInstance implements Writable, WorkflowInstance { dOut.writeInt(persistentVars.size()); for (Map.Entry<String, String> entry : persistentVars.entrySet()) { dOut.writeUTF(entry.getKey()); - writeStringAsBytes(entry.getValue(), dOut); + StringSerializationUtil.writeString(dOut, entry.getValue()); } } @@ -607,34 +605,11 @@ public class LiteWorkflowInstance implements Writable, WorkflowInstance { int numVars = dIn.readInt(); for (int x = 0; x < numVars; x++) { String vName = dIn.readUTF(); - String vVal = readBytesAsString(dIn); - persistentVars.put(vName, vVal); + persistentVars.put(vName, StringSerializationUtil.readString(dIn)); } refreshLog(); } - private void writeStringAsBytes(String value, DataOutput dOut) throws IOException { - if (value == null) { - dOut.writeUTF(null); - return; - } - dOut.writeUTF(DATA_VERSION); - byte[] data = value.getBytes("UTF-8"); - dOut.writeInt(data.length); - dOut.write(data); - } - - private String readBytesAsString(DataInput dIn) throws IOException { - String value = dIn.readUTF(); - if (value != null && value.equals(DATA_VERSION)) { - int length = dIn.readInt(); - byte[] data = new byte[length]; - dIn.readFully(data); - value = new String(data, "UTF-8"); - } - return value; - } - @Override public Configuration getConf() { return conf; http://git-wip-us.apache.org/repos/asf/oozie/blob/a9141968/core/src/main/java/org/apache/oozie/workflow/lite/NodeDef.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/workflow/lite/NodeDef.java b/core/src/main/java/org/apache/oozie/workflow/lite/NodeDef.java index 496b008..9ecad01 100644 --- a/core/src/main/java/org/apache/oozie/workflow/lite/NodeDef.java +++ b/core/src/main/java/org/apache/oozie/workflow/lite/NodeDef.java @@ -20,6 +20,7 @@ package org.apache.oozie.workflow.lite; import org.apache.hadoop.io.Writable; import org.apache.oozie.service.LiteWorkflowStoreService; +import org.apache.oozie.util.StringSerializationUtil; import org.apache.oozie.util.ParamChecker; import org.apache.oozie.workflow.WorkflowException; @@ -151,7 +152,7 @@ public class NodeDef implements Writable { throw new IOException(ex); } } - conf = dataInput.readUTF(); + conf = readString(dataInput); if (conf.equals("null")) { conf = null; } @@ -195,7 +196,7 @@ public class NodeDef implements Writable { throw new IOException(ex); } } - conf = dataInput.readUTF(); + conf = readString(dataInput); if (conf.equals("null")) { conf = null; } @@ -237,7 +238,7 @@ public class NodeDef implements Writable { } dataOutput.writeUTF(handlerClass.getName()); if (conf != null) { - dataOutput.writeUTF(conf); + writeString(dataOutput, conf); } else { dataOutput.writeUTF("null"); @@ -248,6 +249,14 @@ public class NodeDef implements Writable { } } + private void writeString(DataOutput dataOutput, String value) throws IOException { + StringSerializationUtil.writeString(dataOutput, value); + } + + private String readString(DataInput dataInput) throws IOException { + return StringSerializationUtil.readString(dataInput); + } + /** * Write as version one format, this version was since 3.1. * @@ -281,14 +290,14 @@ public class NodeDef implements Writable { dataOutput.writeUTF(nodeDefVersion); dataOutput.writeUTF(name); if (cred != null) { - dataOutput.writeUTF(cred); + writeString(dataOutput, cred); } else { dataOutput.writeUTF("null"); } - dataOutput.writeUTF(handlerClass.getName()); + writeString(dataOutput, handlerClass.getName()); if (conf != null) { - dataOutput.writeUTF(conf); + writeString(dataOutput, conf); } else { dataOutput.writeUTF("null"); http://git-wip-us.apache.org/repos/asf/oozie/blob/a9141968/core/src/test/java/org/apache/oozie/command/wf/TestSubmitXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestSubmitXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestSubmitXCommand.java index 47ff8ca..e71a662 100644 --- a/core/src/test/java/org/apache/oozie/command/wf/TestSubmitXCommand.java +++ b/core/src/test/java/org/apache/oozie/command/wf/TestSubmitXCommand.java @@ -27,6 +27,7 @@ import java.io.PrintWriter; import java.io.StringReader; import java.net.URI; +import org.apache.commons.lang.RandomStringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.oozie.ErrorCode; import org.apache.oozie.WorkflowActionBean; @@ -122,6 +123,50 @@ public class TestSubmitXCommand extends XDataTestCase { } } + public void testSubmitLongXml() throws Exception { + Configuration conf = new XConfiguration(); + String workflowUri = getTestCaseFileUri("workflow.xml"); + String actionXml = "<map-reduce>" + + "<job-tracker>${jobTracker}</job-tracker>" + + "<name-node>${nameNode}</name-node>" + + " <prepare>" + + " <delete path=\"${nameNode}/user/${wf:user()}/mr/${outputDir}\"/>" + + " </prepare>" + + " <configuration>" + + " <property><name>bb</name><value>BB</value></property>" + + " <property><name>cc</name><value>from_action</value></property>" + + " </configuration>" + + " </map-reduce>"; + String appXml = "<workflow-app xmlns='uri:oozie:workflow:0.5' name='too-long-wf'> " + + "<global><configuration>"+generate64kData()+"</configuration></global><start to='mr-node' /> " + + " <action name=\"mr-node\">" + + actionXml + + " <ok to=\"end\"/>" + + " <error to=\"end\"/>" + + "</action>" + + "<end name='end' /> " + "</workflow-app>"; + + writeToFile(appXml, workflowUri); + conf.set(OozieClient.APP_PATH, workflowUri); + conf.set(OozieClient.USER_NAME, getTestUser()); + addBunchOfProperties(conf); + + SubmitXCommand sc = new SubmitXCommand(conf); + sc.call(); + } + + private void addBunchOfProperties(Configuration conf) { + int i=0; + while(conf.size() < 10000){ + conf.set("ID"+i, i+"something"); + i++; + } + } + + private String generate64kData() { + return "<property><name>radnom</name><value>"+ RandomStringUtils.randomAlphanumeric(70000)+"</value></property>"; + } + public void testAppPathIsFile1() throws Exception { Configuration conf = new XConfiguration(); String workflowUri = getTestCaseFileUri("workflow.xml"); @@ -375,8 +420,8 @@ public class TestSubmitXCommand extends XDataTestCase { assertEquals(getNameNodeUri()+"/default-output-dir", actionConf.get("mixed")); } + private void writeToFile(String appXml, String appPath) throws IOException { - // TODO Auto-generated method stub File wf = new File(URI.create(appPath)); PrintWriter out = null; try { http://git-wip-us.apache.org/repos/asf/oozie/blob/a9141968/core/src/test/java/org/apache/oozie/util/TestStringSerializationUtil.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/util/TestStringSerializationUtil.java b/core/src/test/java/org/apache/oozie/util/TestStringSerializationUtil.java new file mode 100644 index 0000000..1c580db --- /dev/null +++ b/core/src/test/java/org/apache/oozie/util/TestStringSerializationUtil.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.util; + +import org.apache.commons.lang.RandomStringUtils; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class TestStringSerializationUtil { + + @Test + public void testStrings() { + for (int i = 1; i < 150000; i += 10000) { + String value = RandomStringUtils.random(i); + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutput dataOutput = new DataOutputStream(baos); + StringSerializationUtil.writeString(dataOutput, value); + DataInput dataInput = new DataInputStream(new ByteArrayInputStream(baos.toByteArray())); + assertEquals("Error in serialization for size " + i, + value, StringSerializationUtil.readString(dataInput)); + } catch (IOException e) { + e.printStackTrace(); + fail("Error in serialization for size " + i + "\n" + value); + } + } + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/a9141968/core/src/test/java/org/apache/oozie/workflow/lite/TestLiteWorkflowApp.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/workflow/lite/TestLiteWorkflowApp.java b/core/src/test/java/org/apache/oozie/workflow/lite/TestLiteWorkflowApp.java new file mode 100644 index 0000000..74c1618 --- /dev/null +++ b/core/src/test/java/org/apache/oozie/workflow/lite/TestLiteWorkflowApp.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.workflow.lite; + +import com.google.common.base.Strings; +import org.apache.commons.lang.RandomStringUtils; +import org.apache.oozie.service.LiteWorkflowStoreService; +import org.apache.oozie.service.Services; +import org.apache.oozie.test.XTestCase; +import org.apache.oozie.util.IOUtils; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; + + +public class TestLiteWorkflowApp extends XTestCase { + + @Override + protected void setUp() throws Exception { + super.setUp(); + new Services().init(); + } + + @Override + protected void tearDown() throws Exception { + Services.get().destroy(); + super.tearDown(); + } + + @Test + public void testReadWrite() throws Exception{ + String definition = "test"+ RandomStringUtils.random(100 * 1024); + LiteWorkflowApp app = new LiteWorkflowApp("name", definition, new StartNodeDef(LiteWorkflowStoreService.LiteControlNodeHandler.class, "foo")); + app.addNode(new EndNodeDef("foo", LiteWorkflowStoreService.LiteControlNodeHandler.class)); + ByteArrayOutputStream baos= new ByteArrayOutputStream(); + DataOutputStream out = new DataOutputStream(baos); + app.write(out); + ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); + DataInputStream in = new DataInputStream(bais); + LiteWorkflowApp app2 = new LiteWorkflowApp(); + app2.readFields(in); + assertTrue(app.equals(app2)); + } + + /** + * Before OOZIE-2777 the "defintion" field of LiteWorkflowApp was split into 20k long strings and serialized out + * one after the other. + * @throws Exception + */ + @Test + public void testOldFormatRead() throws Exception{ + String definition = Strings.repeat("abcdefghijk", 6234); + LiteWorkflowApp app = new LiteWorkflowApp("name", definition, new StartNodeDef(LiteWorkflowStoreService.LiteControlNodeHandler.class, "foo")); + app.addNode(new EndNodeDef("foo", LiteWorkflowStoreService.LiteControlNodeHandler.class)); + DataInputStream in = new DataInputStream(IOUtils.getResourceAsStream("oldWorkFlowApp.serialized", -1)); + LiteWorkflowApp app2 = new LiteWorkflowApp(); + app2.readFields(in); + assertTrue(app.equals(app2)); + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/a9141968/core/src/test/resources/oldWorkFlowApp.serialized ---------------------------------------------------------------------- diff --git a/core/src/test/resources/oldWorkFlowApp.serialized b/core/src/test/resources/oldWorkFlowApp.serialized new file mode 100644 index 0000000..be2e2cd Binary files /dev/null and b/core/src/test/resources/oldWorkFlowApp.serialized differ http://git-wip-us.apache.org/repos/asf/oozie/blob/a9141968/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index f7fd833..e1730a1 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.4.0 release (trunk - unreleased) +OOZIE-2777 Config-default.xml longer than 64k results in java.io.UTFDataFormatException (gezapeti via harsh) OOZIE-2782 Input logic wait documentation is confusing (puru) OOZIE-2781 HCat partition available notification is not sent to coordinator actions if coordinator job is using a different hostname (cname, IP address, etc. ) for HCat URL (puru) OOZIE-2770 Show missing dependencies for coord actions (puru)