CHUKWA-772. Added ChukwaParquetWriter. (Eric Yang)
Project: http://git-wip-us.apache.org/repos/asf/chukwa/repo Commit: http://git-wip-us.apache.org/repos/asf/chukwa/commit/0961ec16 Tree: http://git-wip-us.apache.org/repos/asf/chukwa/tree/0961ec16 Diff: http://git-wip-us.apache.org/repos/asf/chukwa/diff/0961ec16 Branch: refs/heads/master Commit: 0961ec169e349beff2c631c785147de581f8599a Parents: 6012e14 Author: Eric Yang <[email protected]> Authored: Sat Jul 4 23:28:32 2015 -0700 Committer: Eric Yang <[email protected]> Committed: Sat Jul 4 23:28:32 2015 -0700 ---------------------------------------------------------------------- CHANGES.txt | 2 + pom.xml | 9 +- .../writer/parquet/ChukwaParquetWriter.java | 175 +++++++++++++++++++ src/site/apt/pipeline.apt | 62 +------ .../writer/TestChukwaParquetWriter.java | 125 +++++++++++++ 5 files changed, 314 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/chukwa/blob/0961ec16/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index e23338b..f4e7204 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -4,6 +4,8 @@ Trunk (unreleased changes) NEW FEATURES + CHUKWA-772. Added ChukwaParquetWriter. (Eric Yang) + CHUKWA-756. Added ajax-solr UI for log search. (Eric Yang) CHUKWA-755. Added a reverse proxy to Solr for HICC. (Eric Yang) http://git-wip-us.apache.org/repos/asf/chukwa/blob/0961ec16/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 82a7f03..ba4f745 100644 --- a/pom.xml +++ b/pom.xml @@ -327,6 +327,11 @@ <artifactId>shiro-web</artifactId> <version>1.2.3</version> </dependency> + <dependency> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-avro</artifactId> + <version>1.7.0</version> + </dependency> </dependencies> <developers> @@ -1326,10 +1331,6 @@ <repositories> <repository> - <id>codehaus</id> - <url>http://repository.codehaus.org/</url> - </repository> - <repository> <id>clojars</id> <url>http://clojars.org/repo/</url> </repository> http://git-wip-us.apache.org/repos/asf/chukwa/blob/0961ec16/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/parquet/ChukwaParquetWriter.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/parquet/ChukwaParquetWriter.java b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/parquet/ChukwaParquetWriter.java new file mode 100644 index 0000000..6104750 --- /dev/null +++ b/src/main/java/org/apache/hadoop/chukwa/datacollection/writer/parquet/ChukwaParquetWriter.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.chukwa.datacollection.writer.parquet; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.util.Calendar; +import java.util.List; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.chukwa.Chunk; +import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent; +import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter; +import org.apache.hadoop.chukwa.datacollection.writer.PipelineableWriter; +import org.apache.hadoop.chukwa.datacollection.writer.WriterException; +import org.apache.hadoop.chukwa.util.ExceptionUtil; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.Logger; +import org.apache.parquet.avro.AvroParquetWriter; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; + +public class ChukwaParquetWriter extends PipelineableWriter { + private static Logger LOG = Logger.getLogger(ChukwaParquetWriter.class); + public static final String OUTPUT_DIR_OPT= "chukwaCollector.outputDir"; + private int blockSize = 64 * 1024 * 1024; + private int pageSize = 64 * 1024; + private Schema avroSchema = null; + private AvroParquetWriter<GenericRecord> parquetWriter = null; + protected String outputDir = null; + private Calendar calendar = Calendar.getInstance(); + private String localHostAddr = null; + private long rotateInterval = 300000L; + private long startTime = 0; + private Path previousPath = null; + private String previousFileName = null; + private FileSystem fs = null; + + public ChukwaParquetWriter() throws WriterException { + this(ChukwaAgent.getStaticConfiguration()); + } + + public ChukwaParquetWriter(Configuration c) throws WriterException { + setup(c); + } + + @Override + public void init(Configuration c) throws WriterException { + } + + private void setup(Configuration c) throws WriterException { + try { + localHostAddr = "_" + InetAddress.getLocalHost().getHostName() + "_"; + } catch (UnknownHostException e) { + localHostAddr = "-NA-"; + } + outputDir = c.get(OUTPUT_DIR_OPT, "/chukwa/logs"); + blockSize = c.getInt("dfs.blocksize", 64 * 1024 * 1024); + rotateInterval = c.getLong("chukwaCollector.rotateInterval", 300000L); + if(fs == null) { + try { + fs = FileSystem.get(c); + } catch (IOException e) { + throw new WriterException(e); + } + } + + String input = "{\"namespace\": \"chukwa.apache.org\"," + + "\"type\": \"record\"," + + "\"name\": \"Chunk\"," + + "\"fields\": [" + + "{\"name\": \"dataType\", \"type\": \"string\"}," + + "{\"name\": \"data\", \"type\": \"bytes\"}," + + "{\"name\": \"source\", \"type\": \"string\"}," + + "{\"name\": \"stream\", \"type\": \"string\"}," + + "{\"name\": \"tags\", \"type\": \"string\"}," + + "{\"name\": \"seqId\", \"type\": [\"long\", \"null\"]}" + + "]"+ + "}"; + + // load your Avro schema + avroSchema = new Schema.Parser().parse(input); + // generate the corresponding Parquet schema + rotate(); + } + + @Override + public void close() throws WriterException { + try { + parquetWriter.close(); + fs.rename(previousPath, new Path(previousFileName + ".done")); + } catch (IOException e) { + throw new WriterException(e); + } + } + + @Override + public CommitStatus add(List<Chunk> chunks) throws WriterException { + long elapsedTime = 0; + CommitStatus rv = ChukwaWriter.COMMIT_OK; + for(Chunk chunk : chunks) { + try { + GenericRecord record = new GenericData.Record(avroSchema); + record.put("dataType", chunk.getDataType()); + record.put("data", ByteBuffer.wrap(chunk.getData())); + record.put("tags", chunk.getTags()); + record.put("seqId", chunk.getSeqID()); + record.put("source", chunk.getSource()); + record.put("stream", chunk.getStreamName()); + parquetWriter.write(record); + elapsedTime = System.currentTimeMillis() - startTime; + if(elapsedTime > rotateInterval) { + rotate(); + } + } catch (IOException e) { + LOG.warn("Failed to store data to HDFS."); + LOG.warn(ExceptionUtil.getStackTrace(e)); + } + } + if (next != null) { + rv = next.add(chunks); //pass data through + } + return rv; + } + + private void rotate() throws WriterException { + if(parquetWriter!=null) { + try { + parquetWriter.close(); + fs.rename(previousPath, new Path(previousFileName + ".done")); + } catch (IOException e) { + LOG.warn("Fail to close Chukwa write ahead log."); + } + } + startTime = System.currentTimeMillis(); + calendar.setTimeInMillis(startTime); + + String newName = new java.text.SimpleDateFormat("yyyyMMddHHmmssSSS") + .format(calendar.getTime()); + newName += localHostAddr + new java.rmi.server.UID().toString(); + newName = newName.replace("-", ""); + newName = newName.replace(":", ""); + newName = newName.replace(".", ""); + newName = outputDir + "/" + newName.trim(); + LOG.info("writing: "+newName); + Path path = new Path(newName); + try { + parquetWriter = new AvroParquetWriter<GenericRecord>(path, avroSchema, CompressionCodecName.SNAPPY, blockSize, pageSize); + previousPath = path; + previousFileName = newName; + } catch (IOException e) { + throw new WriterException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/chukwa/blob/0961ec16/src/site/apt/pipeline.apt ---------------------------------------------------------------------- diff --git a/src/site/apt/pipeline.apt b/src/site/apt/pipeline.apt index 794bea3..274e596 100644 --- a/src/site/apt/pipeline.apt +++ b/src/site/apt/pipeline.apt @@ -46,20 +46,12 @@ Chukwa agent to load <hbase-site.xml> from class path. --- <property> <name>chukwa.pipeline</name> - <value>org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter</value> + <value>org.apache.hadoop.chukwa.datacollection.writer.parquet.ChukwaParquetWriter</value> </property> --- - In this mode, the filesystem to write to is determined by the option -<writer.hdfs.filesystem> in <chukwa-agent-conf.xml>. - ---- -<property> - <name>writer.hdfs.filesystem</name> - <value>hdfs://localhost:8020/</value> - <description>HDFS to dump to</description> -</property> ---- + In this mode, data will write to HDFS which has been defined by HADOOP_CONF_DIR environment +variable. This is the only option that you really need to specify to get a working pipeline. @@ -85,7 +77,7 @@ incoming chunks fed to them over a socket by Chukwa agent. --- <property> <name>chukwa.pipeline</name> - <value>org.apache.hadoop.chukwa.datacollection.writer.SocketTeeWriter,org.apache.hadoop.chukwa.datacollection.writer.SeqFileWriter</value> + <value>org.apache.hadoop.chukwa.datacollection.writer.SocketTeeWriter,org.apache.hadoop.chukwa.datacollection.writer.parquet.ChukwaParquetWriter</value> </property> --- @@ -142,23 +134,12 @@ as a whole file. This writer is designed for high throughput environment. </property> --- -SeqFileWriter +ChukwaParquetWriter - The <SeqFileWriter> streams chunks of data to HDFS, and write data in -temp filename with <.chukwa> suffix. When the file is completed writing, -the filename is renamed with <.done> suffix. SeqFileWriter has the following + The <ChukwaParquetWriter> streams chunks of data to HDFS. When the file is completed writing, +the filename is renamed with <.done> suffix. ChukwaParquetWriter has the following configuration in <chukwa-agent-conf.xml>. - * <<writer.hdfs.filesystem>> Location to name node address - ---- -<property> - <name>writer.hdfs.filesystem</name> - <value>hdfs://localhost:8020/</value> - <description>HDFS to dump to</description> -</property> ---- - * <<chukwaCollector.outputDir>> Location of collect data sink directory --- @@ -179,35 +160,6 @@ configuration in <chukwa-agent-conf.xml>. </property> --- - * <<chukwaCollector.isFixedTimeRotatorScheme>> A flag to indicate that the - agent should close at a fixed offset after every rotateInterval. - The default value is false which uses the default scheme where - agents close after regular rotateIntervals. - If set to true then specify chukwaCollector.fixedTimeIntervalOffset value. - e.g., if isFixedTimeRotatorScheme is true and fixedTimeIntervalOffset is - set to 10000 and rotateInterval is set to 300000, then the agent will - close its files at 10 seconds past the 5 minute mark, if - isFixedTimeRotatorScheme is false, agents will rotate approximately - once every 5 minutes - ---- - <property> - <name>chukwaCollector.isFixedTimeRotatorScheme</name> - <value>false</value> - </property> ---- - - * <<chukwaCollector.fixedTimeIntervalOffset>> Chukwa fixed time interval - offset value (ms) - ---- -<property> - <name>chukwaCollector.fixedTimeIntervalOffset</name> - <value>30000</value> - <description>Chukwa fixed time interval offset value (ms)</description> -</property> ---- - SocketTeeWriter The <SocketTeeWriter> allows external processes to watch http://git-wip-us.apache.org/repos/asf/chukwa/blob/0961ec16/src/test/java/org/apache/hadoop/chukwa/datacollection/writer/TestChukwaParquetWriter.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/hadoop/chukwa/datacollection/writer/TestChukwaParquetWriter.java b/src/test/java/org/apache/hadoop/chukwa/datacollection/writer/TestChukwaParquetWriter.java new file mode 100644 index 0000000..643a8c5 --- /dev/null +++ b/src/test/java/org/apache/hadoop/chukwa/datacollection/writer/TestChukwaParquetWriter.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.chukwa.datacollection.writer; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; + +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.chukwa.Chunk; +import org.apache.hadoop.chukwa.ChunkBuilder; +import org.apache.hadoop.chukwa.datacollection.writer.parquet.ChukwaParquetWriter; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.Logger; +import org.apache.parquet.avro.AvroReadSupport; +import org.apache.parquet.hadoop.ParquetReader; +import org.junit.Assert; + +import junit.framework.TestCase; + +public class TestChukwaParquetWriter extends TestCase { + private final static Logger LOG = Logger.getLogger(TestChukwaParquetWriter.class); + /** + * Test records are written properly. + */ + public void testWrite() { + // Write 10 chunks + ArrayList<Chunk> chunks = new ArrayList<Chunk>(); + for(int i=0;i<10;i++) { + ChunkBuilder c = new ChunkBuilder(); + c.addRecord(ByteBuffer.allocate(Integer.SIZE).putInt(i).array()); + chunks.add(c.getChunk()); + } + try { + Configuration conf = new Configuration(); + String outputPath = System.getProperty("test.log.dir")+"/testParquet"; + conf.set("chukwaCollector.outputDir", outputPath); + ChukwaWriter parquetWriter = new ChukwaParquetWriter(conf); + parquetWriter.add(chunks); + parquetWriter.close(); + FileSystem fs = FileSystem.get(conf); + // Verify 10 chunks are written + Path file = new Path(outputPath); + FileStatus[] status = fs.listStatus(file); + for(FileStatus finfo : status) { + if(finfo.getPath().getName().contains(".done")) { + LOG.info("File name: "+finfo.getPath().getName()); + LOG.info("File Size: " + finfo.getLen()); + ParquetReader<GenericRecord> pr = ParquetReader.builder(new AvroReadSupport<GenericRecord>(), finfo.getPath()).build(); + for(int i=0; i< 10; i++) { + GenericRecord nextRecord = pr.read(); + int expected = ByteBuffer.wrap(chunks.get(i).getData()).getInt(); + LOG.info("expected: " + expected); + ByteBuffer content = (ByteBuffer) nextRecord.get("data"); + int actual = content.getInt(); + LOG.info("actual: " + actual); + Assert.assertSame(expected, actual); + } + } + fs.delete(finfo.getPath(), true); + } + } catch (WriterException e) { + Assert.fail(e.getMessage()); + } catch (IOException e) { + Assert.fail(e.getMessage()); + } + } + + /** + * Test file rotation interval. + */ + public void testRotate() { + // Write 10 chunks + ArrayList<Chunk> chunks = new ArrayList<Chunk>(); + for(int i=0;i<10;i++) { + ChunkBuilder c = new ChunkBuilder(); + c.addRecord(ByteBuffer.allocate(Integer.SIZE).putInt(i).array()); + chunks.add(c.getChunk()); + } + try { + Configuration conf = new Configuration(); + String outputPath = System.getProperty("test.log.dir")+"/testParquetRotate"; + conf.set("chukwaCollector.outputDir", outputPath); + conf.setLong("chukwaCollector.rotateInterval", 3000L); + ChukwaWriter parquetWriter = new ChukwaParquetWriter(conf); + for(int i=0; i<2; i++) { + parquetWriter.add(chunks); + try { + Thread.sleep(3000L); + } catch (InterruptedException e) { + Assert.fail(e.getMessage()); + } + } + parquetWriter.close(); + FileSystem fs = FileSystem.get(conf); + // Verify 10 chunks are written + Path file = new Path(outputPath); + FileStatus[] status = fs.listStatus(file); + Assert.assertTrue(status.length >= 2); + } catch (WriterException e) { + Assert.fail(e.getMessage()); + } catch (IOException e) { + Assert.fail(e.getMessage()); + } + } + +}
