Author: ctubbsii
Date: Fri Jan 18 03:18:29 2013
New Revision: 1435013

URL: http://svn.apache.org/viewvc?rev=1435013&view=rev
Log:
ACCUMULO-955 Made BatchWriterConfig Writable, so it can be stored in a job's 
configuration in a human-readable way. Updated AccumuloOutputFormat to use it, 
and
added unit tests for ACCUMULO-706 and ACCUMULO-955. Added an additional check 
for a reasonable minimum maxMemory value.

Added:
    
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/BatchWriterConfigTest.java
Modified:
    
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/BatchWriterConfig.java
    
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
    
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/DeleteCommand.java
    
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/InsertCommand.java
    
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java
    
accumulo/trunk/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TeraSortIngest.java
    
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousMoru.java

Modified: 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/BatchWriterConfig.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/BatchWriterConfig.java?rev=1435013&r1=1435012&r2=1435013&view=diff
==============================================================================
--- 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/BatchWriterConfig.java
 (original)
+++ 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/BatchWriterConfig.java
 Fri Jan 18 03:18:29 2013
@@ -16,24 +16,43 @@
  */
 package org.apache.accumulo.core.client;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.StringUtils;
+
 /**
  * This object holds configuration settings used to instantiate a {@link 
BatchWriter}
  */
-public class BatchWriterConfig {
-  private long maxMemory = 50 * 1024 * 1024;
-  private long maxLatency = 120000;
-  private long timeout = Long.MAX_VALUE;
-  private int maxWriteThreads = 3;
+public class BatchWriterConfig implements Writable {
+  
+  private static final Long DEFAULT_MAX_MEMORY = 50 * 1024 * 1024l;
+  private Long maxMemory = null;
+  
+  private static final Long DEFAULT_MAX_LATENCY = 2 * 60 * 1000l;
+  private Long maxLatency = null;
+  
+  private static final Long DEFAULT_TIMEOUT = Long.MAX_VALUE;
+  private Long timeout = null;
+  
+  private static final Integer DEFAULT_MAX_WRITE_THREADS = 3;
+  private Integer maxWriteThreads = null;
   
   /**
    * 
    * @param maxMemory
-   *          size in bytes of the maximum memory to batch before writing. 
Defaults to 50M.
+   *          size in bytes of the maximum memory to batch before writing. 
Minimum 1K. Defaults to 50M.
    */
   
   public BatchWriterConfig setMaxMemory(long maxMemory) {
+    if (maxMemory < 1024)
+      throw new IllegalArgumentException("Max memory is too low at " + 
maxMemory + ". Minimum 1K.");
     this.maxMemory = maxMemory;
     return this;
   }
@@ -93,18 +112,76 @@ public class BatchWriterConfig {
   }
   
   public long getMaxMemory() {
-    return maxMemory;
+    return maxMemory != null ? maxMemory : DEFAULT_MAX_MEMORY;
   }
   
   public long getMaxLatency(TimeUnit timeUnit) {
-    return timeUnit.convert(maxLatency, TimeUnit.MILLISECONDS);
+    return timeUnit.convert(maxLatency != null ? maxLatency : 
DEFAULT_MAX_LATENCY, TimeUnit.MILLISECONDS);
   }
   
   public long getTimeout(TimeUnit timeUnit) {
-    return timeUnit.convert(timeout, TimeUnit.MILLISECONDS);
+    return timeUnit.convert(timeout != null ? timeout : DEFAULT_TIMEOUT, 
TimeUnit.MILLISECONDS);
   }
   
   public int getMaxWriteThreads() {
-    return maxWriteThreads;
+    return maxWriteThreads != null ? maxWriteThreads : 
DEFAULT_MAX_WRITE_THREADS;
+  }
+  
+  @Override
+  public void write(DataOutput out) throws IOException {
+    // write this out in a human-readable way
+    ArrayList<String> fields = new ArrayList<String>();
+    if (maxMemory != null)
+      addField(fields, "maxMemory", maxMemory);
+    if (maxLatency != null)
+      addField(fields, "maxLatency", maxLatency);
+    if (maxWriteThreads != null)
+      addField(fields, "maxWriteThreads", maxWriteThreads);
+    if (timeout != null)
+      addField(fields, "timeout", timeout);
+    String output = StringUtils.join(",", fields);
+    
+    byte[] bytes = output.getBytes(Charset.forName("UTF-8"));
+    byte[] len = String.format("%6s#", Integer.toString(bytes.length, 
36)).getBytes("UTF-8");
+    if (len.length != 7)
+      throw new IllegalStateException("encoded length does not match expected 
value");
+    out.write(len);
+    out.write(bytes);
+  }
+  
+  private void addField(List<String> fields, String name, Object value) {
+    String key = StringUtils.escapeString(name, '\\', new char[] {',', '='});
+    String val = StringUtils.escapeString(String.valueOf(value), '\\', new 
char[] {',', '='});
+    fields.add(key + '=' + val);
+  }
+  
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    byte[] len = new byte[7];
+    in.readFully(len);
+    String strLen = new String(len, Charset.forName("UTF-8"));
+    if (!strLen.endsWith("#"))
+      throw new IllegalStateException("length was not encoded correctly");
+    byte[] bytes = new 
byte[Integer.parseInt(strLen.substring(strLen.lastIndexOf(' ') + 1, 
strLen.length() - 1), 36)];
+    in.readFully(bytes);
+    
+    String strFields = new String(bytes, Charset.forName("UTF-8"));
+    String[] fields = StringUtils.split(strFields, '\\', ',');
+    for (String field : fields) {
+      String[] keyValue = StringUtils.split(field, '\\', '=');
+      String key = keyValue[0];
+      String value = keyValue[1];
+      if ("maxMemory".equals(key)) {
+        maxMemory = Long.valueOf(value);
+      } else if ("maxLatency".equals(key)) {
+        maxLatency = Long.valueOf(value);
+      } else if ("maxWriteThreads".equals(key)) {
+        maxWriteThreads = Integer.valueOf(value);
+      } else if ("timeout".equals(key)) {
+        timeout = Long.valueOf(value);
+      } else {
+        /* ignore any other properties */
+      }
+    }
   }
 }

Modified: 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java?rev=1435013&r1=1435012&r2=1435013&view=diff
==============================================================================
--- 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
 (original)
+++ 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
 Fri Jan 18 03:18:29 2013
@@ -16,7 +16,12 @@
  */
 package org.apache.accumulo.core.client.mapreduce;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
+import java.nio.charset.Charset;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map.Entry;
@@ -141,7 +146,15 @@ public class AccumuloOutputFormat extend
    * @see #setMockInstance(Job, String)
    * @see #getInstance(JobContext)
    */
-  private static final String MOCK = ".useMockInstance";
+  private static final String MOCK = PREFIX + ".useMockInstance";
+  
+  /**
+   * Key for storing the {@link BatchWriterConfig}.
+   * 
+   * @see #setBatchWriterOptions(Job, BatchWriterConfig)
+   * @see #getBatchWriterOptions(JobContext)
+   */
+  private static final String BATCH_WRITER_CONFIG = PREFIX + ".bwConfig";
   
   /**
    * Key for storing the directive to create tables that don't exist
@@ -232,35 +245,26 @@ public class AccumuloOutputFormat extend
   }
   
   /**
+   * Sets the configuration for for the job's {@link BatchWriter} instances. 
If not set, a new {@link BatchWriterConfig}, with sensible built-in defaults is
+   * used. Setting the configuration multiple times overwrites any previous 
configuration.
+   * 
+   * @param job
+   *          the Hadoop job instance to be configured
+   * @param bwConfig
+   *          the configuration for the {@link BatchWriter}
    * @since 1.5.0
-   * @see BatchWriterConfig#setMaxMemory(long)
-   */
-  public static void setMaxMutationBufferSize(Job job, long numberOfBytes) {
-    job.getConfiguration().setLong(MAX_MUTATION_BUFFER_SIZE, numberOfBytes);
-  }
-  
-  /**
-   * @since 1.5.0
-   * @see BatchWriterConfig#setMaxLatency(long, TimeUnit)
-   */
-  public static void setMaxLatency(Job job, int numberOfMilliseconds) {
-    job.getConfiguration().setInt(MAX_LATENCY, numberOfMilliseconds);
-  }
-  
-  /**
-   * @since 1.5.0
-   * @see BatchWriterConfig#setMaxWriteThreads(int)
-   */
-  public static void setMaxWriteThreads(Job job, int numberOfThreads) {
-    job.getConfiguration().setInt(NUM_WRITE_THREADS, numberOfThreads);
-  }
-  
-  /**
-   * @since 1.5.0
-   * @see BatchWriterConfig#setTimeout(long, TimeUnit)
    */
-  public static void setTimeout(Job job, long time, TimeUnit timeUnit) {
-    job.getConfiguration().setLong(TIMEOUT, timeUnit.toMillis(time));
+  public static void setBatchWriterOptions(Job job, BatchWriterConfig 
bwConfig) {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    String serialized;
+    try {
+      bwConfig.write(new DataOutputStream(baos));
+      serialized = new String(baos.toByteArray(), Charset.forName("UTF-8"));
+      baos.close();
+    } catch (IOException e) {
+      throw new IllegalArgumentException("unable to serialize " + 
BatchWriterConfig.class.getName());
+    }
+    job.getConfiguration().set(BATCH_WRITER_CONFIG, serialized);
   }
   
   /**
@@ -363,55 +367,29 @@ public class AccumuloOutputFormat extend
   }
   
   /**
-   * Gets the corresponding {@link BatchWriterConfig} setting.
-   * 
-   * @param context
-   *          the Hadoop context for the configured job
-   * @return the max memory to use (in bytes)
-   * @since 1.5.0
-   * @see #setMaxMutationBufferSize(Job, long)
-   */
-  protected static long getMaxMutationBufferSize(JobContext context) {
-    return context.getConfiguration().getLong(MAX_MUTATION_BUFFER_SIZE, new 
BatchWriterConfig().getMaxMemory());
-  }
-  
-  /**
-   * Gets the corresponding {@link BatchWriterConfig} setting.
-   * 
-   * @param context
-   *          the Hadoop context for the configured job
-   * @return the max latency to use (in millis)
-   * @since 1.5.0
-   * @see #setMaxLatency(Job, int)
-   */
-  protected static long getMaxLatency(JobContext context) {
-    return context.getConfiguration().getLong(MAX_LATENCY, new 
BatchWriterConfig().getMaxLatency(TimeUnit.MILLISECONDS));
-  }
-  
-  /**
-   * Gets the corresponding {@link BatchWriterConfig} setting.
-   * 
-   * @param context
-   *          the Hadoop context for the configured job
-   * @return the max write threads to use
-   * @since 1.5.0
-   * @see #setMaxWriteThreads(Job, int)
-   */
-  protected static int getMaxWriteThreads(JobContext context) {
-    return context.getConfiguration().getInt(NUM_WRITE_THREADS, new 
BatchWriterConfig().getMaxWriteThreads());
-  }
-  
-  /**
-   * Gets the corresponding {@link BatchWriterConfig} setting.
+   * Gets the {@link BatchWriterConfig} settings.
    * 
    * @param context
    *          the Hadoop context for the configured job
-   * @return the timeout for write operations
+   * @return the configuration object
    * @since 1.5.0
-   * @see #setTimeout(Job, long, TimeUnit)
+   * @see #setBatchWriterOptions(Job, BatchWriterConfig)
    */
-  protected static long getTimeout(JobContext context) {
-    return context.getConfiguration().getLong(TIMEOUT, Long.MAX_VALUE);
+  protected static BatchWriterConfig getBatchWriterOptions(JobContext context) 
{
+    String serialized = context.getConfiguration().get(BATCH_WRITER_CONFIG);
+    BatchWriterConfig bwConfig = new BatchWriterConfig();
+    if (serialized == null || serialized.isEmpty()) {
+      return bwConfig;
+    } else {
+      try {
+        ByteArrayInputStream bais = new 
ByteArrayInputStream(serialized.getBytes(Charset.forName("UTF-8")));
+        bwConfig.readFields(new DataInputStream(bais));
+        bais.close();
+        return bwConfig;
+      } catch (IOException e) {
+        throw new IllegalArgumentException("unable to serialize " + 
BatchWriterConfig.class.getName());
+      }
+    }
   }
   
   /**
@@ -475,9 +453,7 @@ public class AccumuloOutputFormat extend
       
       if (!simulate) {
         this.conn = getInstance(context).getConnector(getUsername(context), 
getPassword(context));
-        mtbw = conn.createMultiTableBatchWriter(new 
BatchWriterConfig().setMaxMemory(getMaxMutationBufferSize(context))
-            .setMaxLatency(getMaxLatency(context), 
TimeUnit.MILLISECONDS).setMaxWriteThreads(getMaxWriteThreads(context))
-            .setTimeout(getTimeout(context), TimeUnit.MILLISECONDS));
+        mtbw = 
conn.createMultiTableBatchWriter(getBatchWriterOptions(context));
       }
     }
     
@@ -658,12 +634,6 @@ public class AccumuloOutputFormat extend
   private static final String NUM_WRITE_THREADS = PREFIX + ".writethreads";
   
   /**
-   * @deprecated since 1.5.0;
-   */
-  @Deprecated
-  private static final String TIMEOUT = PREFIX + ".timeout";
-  
-  /**
    * @deprecated since 1.5.0; Use {@link #setOutputInfo(Job, String, byte[], 
boolean, String)} instead.
    */
   @Deprecated
@@ -705,7 +675,7 @@ public class AccumuloOutputFormat extend
   }
   
   /**
-   * @deprecated since 1.5.0; Use {@link #setMaxMutationBufferSize(Job, long)} 
instead.
+   * @deprecated since 1.5.0; Use {@link #setBatchWriterOptions(Job, 
BatchWriterConfig)} instead.
    */
   @Deprecated
   public static void setMaxMutationBufferSize(Configuration conf, long 
numberOfBytes) {
@@ -713,7 +683,7 @@ public class AccumuloOutputFormat extend
   }
   
   /**
-   * @deprecated since 1.5.0; Use {@link #setMaxLatency(Job, int)} instead.
+   * @deprecated since 1.5.0; Use {@link #setBatchWriterOptions(Job, 
BatchWriterConfig)} instead.
    */
   @Deprecated
   public static void setMaxLatency(Configuration conf, int 
numberOfMilliseconds) {
@@ -721,7 +691,7 @@ public class AccumuloOutputFormat extend
   }
   
   /**
-   * @deprecated since 1.5.0; Use {@link #setMaxWriteThreads(Job, int)} 
instead.
+   * @deprecated since 1.5.0; Use {@link #setBatchWriterOptions(Job, 
BatchWriterConfig)} instead.
    */
   @Deprecated
   public static void setMaxWriteThreads(Configuration conf, int 
numberOfThreads) {
@@ -788,7 +758,7 @@ public class AccumuloOutputFormat extend
   }
   
   /**
-   * @deprecated since 1.5.0; Use {@link 
#getMaxMutationBufferSize(JobContext)} instead.
+   * @deprecated since 1.5.0; Use {@link #getBatchWriterOptions(JobContext)} 
instead.
    */
   @Deprecated
   protected static long getMaxMutationBufferSize(Configuration conf) {
@@ -796,7 +766,7 @@ public class AccumuloOutputFormat extend
   }
   
   /**
-   * @deprecated since 1.5.0; Use {@link #getMaxLatency(JobContext)} instead.
+   * @deprecated since 1.5.0; Use {@link #getBatchWriterOptions(JobContext)} 
instead.
    */
   @Deprecated
   protected static int getMaxLatency(Configuration conf) {
@@ -806,7 +776,7 @@ public class AccumuloOutputFormat extend
   }
   
   /**
-   * @deprecated since 1.5.0; Use {@link #getMaxWriteThreads(JobContext)} 
instead.
+   * @deprecated since 1.5.0; Use {@link #getBatchWriterOptions(JobContext)} 
instead.
    */
   @Deprecated
   protected static int getMaxWriteThreads(Configuration conf) {

Modified: 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/DeleteCommand.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/DeleteCommand.java?rev=1435013&r1=1435012&r2=1435013&view=diff
==============================================================================
--- 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/DeleteCommand.java
 (original)
+++ 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/DeleteCommand.java
 Fri Jan 18 03:18:29 2013
@@ -47,8 +47,9 @@ public class DeleteCommand extends Comma
     return Long.MAX_VALUE;
   }
   
-  public int execute(final String fullCommand, final CommandLine cl, final 
Shell shellState) throws AccumuloException, AccumuloSecurityException, 
TableNotFoundException,
-      IOException, ConstraintViolationException {
+  @Override
+  public int execute(final String fullCommand, final CommandLine cl, final 
Shell shellState) throws AccumuloException, AccumuloSecurityException,
+      TableNotFoundException, IOException, ConstraintViolationException {
     shellState.checkTableState();
     
     final Mutation m = new Mutation(new 
Text(cl.getArgs()[0].getBytes(Shell.CHARSET)));
@@ -68,7 +69,7 @@ public class DeleteCommand extends Comma
       m.putDelete(colf, colq);
     }
     final BatchWriter bw = 
shellState.getConnector().createBatchWriter(shellState.getTableName(),
-        new 
BatchWriterConfig().setMaxMemory(m.estimatedMemoryUsed()).setMaxWriteThreads(1).setTimeout(getTimeout(cl),
 TimeUnit.MILLISECONDS));
+        new BatchWriterConfig().setMaxMemory(Math.max(m.estimatedMemoryUsed(), 
1024)).setMaxWriteThreads(1).setTimeout(getTimeout(cl), TimeUnit.MILLISECONDS));
     bw.addMutation(m);
     bw.close();
     return 0;
@@ -100,7 +101,7 @@ public class DeleteCommand extends Comma
         "time before insert should fail if no data is written. If no unit is 
given assumes seconds.  Units d,h,m,s,and ms are supported.  e.g. 30s or 
100ms");
     timeoutOption.setArgName("timeout");
     o.addOption(timeoutOption);
-
+    
     return o;
   }
   

Modified: 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/InsertCommand.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/InsertCommand.java?rev=1435013&r1=1435012&r2=1435013&view=diff
==============================================================================
--- 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/InsertCommand.java
 (original)
+++ 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/util/shell/commands/InsertCommand.java
 Fri Jan 18 03:18:29 2013
@@ -54,9 +54,10 @@ public class InsertCommand extends Comma
     
     return Long.MAX_VALUE;
   }
-
-  public int execute(final String fullCommand, final CommandLine cl, final 
Shell shellState) throws AccumuloException, AccumuloSecurityException, 
TableNotFoundException,
-      IOException, ConstraintViolationException {
+  
+  @Override
+  public int execute(final String fullCommand, final CommandLine cl, final 
Shell shellState) throws AccumuloException, AccumuloSecurityException,
+      TableNotFoundException, IOException, ConstraintViolationException {
     shellState.checkTableState();
     
     final Mutation m = new Mutation(new 
Text(cl.getArgs()[0].getBytes(Shell.CHARSET)));
@@ -78,7 +79,7 @@ public class InsertCommand extends Comma
       m.put(colf, colq, val);
     
     final BatchWriter bw = 
shellState.getConnector().createBatchWriter(shellState.getTableName(),
-        new 
BatchWriterConfig().setMaxMemory(m.estimatedMemoryUsed()).setMaxWriteThreads(1).setTimeout(getTimeout(cl),
 TimeUnit.MILLISECONDS));
+        new BatchWriterConfig().setMaxMemory(Math.max(m.estimatedMemoryUsed(), 
1024)).setMaxWriteThreads(1).setTimeout(getTimeout(cl), TimeUnit.MILLISECONDS));
     bw.addMutation(m);
     try {
       bw.close();
@@ -103,7 +104,7 @@ public class InsertCommand extends Comma
         if (e.getCause() != null)
           lines.add("   Caused by : " + e.getCause().getClass().getName() + " 
: " + e.getCause().getMessage());
       }
-
+      
       shellState.printLines(lines.iterator(), false);
     }
     return 0;
@@ -134,7 +135,7 @@ public class InsertCommand extends Comma
         "time before insert should fail if no data is written. If no unit is 
given assumes seconds.  Units d,h,m,s,and ms are supported.  e.g. 30s or 
100ms");
     timeoutOption.setArgName("timeout");
     o.addOption(timeoutOption);
-
+    
     return o;
   }
   

Added: 
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/BatchWriterConfigTest.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/BatchWriterConfigTest.java?rev=1435013&view=auto
==============================================================================
--- 
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/BatchWriterConfigTest.java
 (added)
+++ 
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/BatchWriterConfigTest.java
 Fri Jan 18 03:18:29 2013
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+
+/**
+ * 
+ */
+public class BatchWriterConfigTest {
+  
+  @Test
+  public void testReasonableDefaults() {
+    long expectedMaxMemory = 50 * 1024 * 1024l;
+    long expectedMaxLatency = 120000l;
+    long expectedTimeout = Long.MAX_VALUE;
+    int expectedMaxWriteThreads = 3;
+    
+    BatchWriterConfig defaults = new BatchWriterConfig();
+    assertEquals(expectedMaxMemory, defaults.getMaxMemory());
+    assertEquals(expectedMaxLatency, 
defaults.getMaxLatency(TimeUnit.MILLISECONDS));
+    assertEquals(expectedTimeout, defaults.getTimeout(TimeUnit.MILLISECONDS));
+    assertEquals(expectedMaxWriteThreads, defaults.getMaxWriteThreads());
+  }
+  
+  @Test
+  public void testOverridingDefaults() {
+    BatchWriterConfig bwConfig = new BatchWriterConfig();
+    bwConfig.setMaxMemory(1123581321l);
+    bwConfig.setMaxLatency(22, TimeUnit.HOURS);
+    bwConfig.setTimeout(33, TimeUnit.DAYS);
+    bwConfig.setMaxWriteThreads(42);
+    
+    assertEquals(1123581321l, bwConfig.getMaxMemory());
+    assertEquals(22 * 60 * 60 * 1000l, 
bwConfig.getMaxLatency(TimeUnit.MILLISECONDS));
+    assertEquals(33 * 24 * 60 * 60 * 1000l, 
bwConfig.getTimeout(TimeUnit.MILLISECONDS));
+    assertEquals(42, bwConfig.getMaxWriteThreads());
+  }
+  
+  @Test
+  public void testZeroTimeoutAndLatency() {
+    BatchWriterConfig bwConfig = new BatchWriterConfig();
+    bwConfig.setMaxLatency(0, TimeUnit.MILLISECONDS);
+    bwConfig.setTimeout(0, TimeUnit.MILLISECONDS);
+    
+    assertEquals(Long.MAX_VALUE, 
bwConfig.getMaxLatency(TimeUnit.MILLISECONDS));
+    assertEquals(Long.MAX_VALUE, bwConfig.getTimeout(TimeUnit.MILLISECONDS));
+  }
+  
+  @Test(expected = IllegalArgumentException.class)
+  public void testMaxMemoryTooLow() {
+    BatchWriterConfig bwConfig = new BatchWriterConfig();
+    bwConfig.setMaxMemory(1024 - 1);
+  }
+  
+  @Test(expected = IllegalArgumentException.class)
+  public void testNegativeMaxLatency() {
+    BatchWriterConfig bwConfig = new BatchWriterConfig();
+    bwConfig.setMaxLatency(-1, TimeUnit.DAYS);
+  }
+  
+  @Test(expected = IllegalArgumentException.class)
+  public void testNegativeTimeout() {
+    BatchWriterConfig bwConfig = new BatchWriterConfig();
+    bwConfig.setTimeout(-1, TimeUnit.DAYS);
+  }
+  
+  @Test(expected = IllegalArgumentException.class)
+  public void testZeroMaxWriteThreads() {
+    BatchWriterConfig bwConfig = new BatchWriterConfig();
+    bwConfig.setMaxWriteThreads(0);
+  }
+  
+  @Test(expected = IllegalArgumentException.class)
+  public void testNegativeMaxWriteThreads() {
+    BatchWriterConfig bwConfig = new BatchWriterConfig();
+    bwConfig.setMaxWriteThreads(-1);
+  }
+  
+  @Test
+  public void testSerialize() throws IOException {
+    // make sure we aren't testing defaults
+    final BatchWriterConfig bwDefaults = new BatchWriterConfig();
+    assertNotEquals(7654321l, bwDefaults.getMaxLatency(TimeUnit.MILLISECONDS));
+    assertNotEquals(9898989l, bwDefaults.getTimeout(TimeUnit.MILLISECONDS));
+    assertNotEquals(42, bwDefaults.getMaxWriteThreads());
+    assertNotEquals(1123581321l, bwDefaults.getMaxMemory());
+    
+    // test setting all fields
+    BatchWriterConfig bwConfig = new BatchWriterConfig();
+    bwConfig.setMaxLatency(7654321l, TimeUnit.MILLISECONDS);
+    bwConfig.setTimeout(9898989l, TimeUnit.MILLISECONDS);
+    bwConfig.setMaxWriteThreads(42);
+    bwConfig.setMaxMemory(1123581321l);
+    byte[] bytes = createBytes(bwConfig);
+    checkBytes(bwConfig, bytes);
+    
+    // test human-readable serialization
+    bwConfig = new BatchWriterConfig();
+    bwConfig.setMaxWriteThreads(42);
+    bytes = createBytes(bwConfig);
+    assertEquals("     i#maxWriteThreads=42", new String(bytes, 
Charset.forName("UTF-8")));
+    checkBytes(bwConfig, bytes);
+    
+    // test human-readable with 2 fields
+    bwConfig = new BatchWriterConfig();
+    bwConfig.setMaxWriteThreads(24);
+    bwConfig.setTimeout(3, TimeUnit.SECONDS);
+    bytes = createBytes(bwConfig);
+    assertEquals("     v#maxWriteThreads=24,timeout=3000", new String(bytes, 
Charset.forName("UTF-8")));
+    checkBytes(bwConfig, bytes);
+  }
+  
+  private byte[] createBytes(BatchWriterConfig bwConfig) throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    bwConfig.write(new DataOutputStream(baos));
+    return baos.toByteArray();
+  }
+  
+  private void checkBytes(BatchWriterConfig bwConfig, byte[] bytes) throws 
IOException {
+    ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+    BatchWriterConfig createdConfig = new BatchWriterConfig();
+    createdConfig.readFields(new DataInputStream(bais));
+    
+    assertEquals(bwConfig.getMaxMemory(), createdConfig.getMaxMemory());
+    assertEquals(bwConfig.getMaxLatency(TimeUnit.MILLISECONDS), 
createdConfig.getMaxLatency(TimeUnit.MILLISECONDS));
+    assertEquals(bwConfig.getTimeout(TimeUnit.MILLISECONDS), 
createdConfig.getTimeout(TimeUnit.MILLISECONDS));
+    assertEquals(bwConfig.getMaxWriteThreads(), 
createdConfig.getMaxWriteThreads());
+  }
+  
+}

Modified: 
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java?rev=1435013&r1=1435012&r2=1435013&view=diff
==============================================================================
--- 
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java
 (original)
+++ 
accumulo/trunk/core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java
 Fri Jan 18 03:18:29 2013
@@ -18,12 +18,14 @@ package org.apache.accumulo.core.client.
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.BatchWriter;
@@ -39,6 +41,7 @@ import org.apache.accumulo.core.util.Cac
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
@@ -120,6 +123,46 @@ public class AccumuloOutputFormatTest {
   }
   
   @Test
+  public void testBWSettings() throws IOException {
+    Job job = new Job();
+    
+    // make sure we aren't testing defaults
+    final BatchWriterConfig bwDefaults = new BatchWriterConfig();
+    assertNotEquals(7654321l, bwDefaults.getMaxLatency(TimeUnit.MILLISECONDS));
+    assertNotEquals(9898989l, bwDefaults.getTimeout(TimeUnit.MILLISECONDS));
+    assertNotEquals(42, bwDefaults.getMaxWriteThreads());
+    assertNotEquals(1123581321l, bwDefaults.getMaxMemory());
+    
+    final BatchWriterConfig bwConfig = new BatchWriterConfig();
+    bwConfig.setMaxLatency(7654321l, TimeUnit.MILLISECONDS);
+    bwConfig.setTimeout(9898989l, TimeUnit.MILLISECONDS);
+    bwConfig.setMaxWriteThreads(42);
+    bwConfig.setMaxMemory(1123581321l);
+    AccumuloOutputFormat.setBatchWriterOptions(job, bwConfig);
+    
+    AccumuloOutputFormat myAOF = new AccumuloOutputFormat() {
+      @Override
+      public void checkOutputSpecs(JobContext job) throws IOException {
+        BatchWriterConfig bwOpts = getBatchWriterOptions(job);
+        
+        // passive check
+        assertEquals(bwConfig.getMaxLatency(TimeUnit.MILLISECONDS), 
bwOpts.getMaxLatency(TimeUnit.MILLISECONDS));
+        assertEquals(bwConfig.getTimeout(TimeUnit.MILLISECONDS), 
bwOpts.getTimeout(TimeUnit.MILLISECONDS));
+        assertEquals(bwConfig.getMaxWriteThreads(), 
bwOpts.getMaxWriteThreads());
+        assertEquals(bwConfig.getMaxMemory(), bwOpts.getMaxMemory());
+        
+        // explicit check
+        assertEquals(7654321l, bwOpts.getMaxLatency(TimeUnit.MILLISECONDS));
+        assertEquals(9898989l, bwOpts.getTimeout(TimeUnit.MILLISECONDS));
+        assertEquals(42, bwOpts.getMaxWriteThreads());
+        assertEquals(1123581321l, bwOpts.getMaxMemory());
+        
+      }
+    };
+    myAOF.checkOutputSpecs(job);
+  }
+  
+  @Test
   public void testMR() throws Exception {
     MockInstance mockInstance = new MockInstance("testmrinstance");
     Connector c = mockInstance.getConnector("root", new byte[] {});

Modified: 
accumulo/trunk/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TeraSortIngest.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TeraSortIngest.java?rev=1435013&r1=1435012&r2=1435013&view=diff
==============================================================================
--- 
accumulo/trunk/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TeraSortIngest.java
 (original)
+++ 
accumulo/trunk/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/TeraSortIngest.java
 Fri Jan 18 03:18:29 2013
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Random;
 
 import org.apache.accumulo.core.cli.ClientOnRequiredTable;
+import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Value;
@@ -383,7 +384,8 @@ public class TeraSortIngest extends Conf
     
     job.setOutputFormatClass(AccumuloOutputFormat.class);
     opts.setAccumuloConfigs(job);
-    AccumuloOutputFormat.setMaxMutationBufferSize(job, 10L * 1000 * 1000);
+    BatchWriterConfig bwConfig = new BatchWriterConfig().setMaxMemory(10L * 
1000 * 1000);
+    AccumuloOutputFormat.setBatchWriterOptions(job, bwConfig);
     
     Configuration conf = job.getConfiguration();
     conf.setLong(NUMROWS, opts.numRows);

Modified: 
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousMoru.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousMoru.java?rev=1435013&r1=1435012&r2=1435013&view=diff
==============================================================================
--- 
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousMoru.java
 (original)
+++ 
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousMoru.java
 Fri Jan 18 03:18:29 2013
@@ -149,9 +149,7 @@ public class ContinuousMoru extends Conf
     job.setNumReduceTasks(0);
     
     job.setOutputFormatClass(AccumuloOutputFormat.class);
-    AccumuloOutputFormat.setMaxLatency(job, 
Integer.parseInt(String.valueOf(bwOpts.batchLatency)));
-    AccumuloOutputFormat.setMaxMutationBufferSize(job, bwOpts.batchMemory);
-    AccumuloOutputFormat.setMaxWriteThreads(job, bwOpts.batchThreads);
+    AccumuloOutputFormat.setBatchWriterOptions(job, 
bwOpts.getBatchWriterConfig());
     
     Configuration conf = job.getConfiguration();
     conf.setLong(MIN, opts.min);


Reply via email to