[ 
https://issues.apache.org/jira/browse/SQOOP-604?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13456593#comment-13456593
 ] 

Zoltan Toth-Czifra edited comment on SQOOP-604 at 9/17/12 3:26 AM:
-------------------------------------------------------------------

{code}
diff --git a/src/java/org/apache/sqoop/mapreduce/MySQLExportMapper.java 
b/src/java/org/apache/sqoop/mapreduce/MySQLExportMapper.java
old mode 100644
new mode 100755
index a4e8b88..648c1e2
--- a/src/java/org/apache/sqoop/mapreduce/MySQLExportMapper.java
+++ b/src/java/org/apache/sqoop/mapreduce/MySQLExportMapper.java
@@ -64,6 +64,18 @@ public class MySQLExportMapper<KEYIN, VALIN>
   // Configured value for MSYQL_CHECKPOINT_BYTES_KEY.
   protected long checkpointDistInBytes;

+  /** Configuration key that specifies the number of milliseconds
+   * to sleep at the end of each checkpoint commit
+   * Default is 0, no sleep.
+   */
+  public static final String MYSQL_CHECKPOINT_SLEEP_KEY =
+      "sqoop.mysql.export.sleep.ms";
+
+  public static final long DEFAULT_CHECKPOINT_SLEEP_MS = 0;
+
+  // Configured value for MYSQL_CHECKPOINT_SLEEP_KEY.
+  protected long checkpointSleepMs;
+
   protected Configuration conf;

   /** The FIFO being used to communicate with mysqlimport. */
@@ -314,6 +326,13 @@ public class MySQLExportMapper<KEYIN, VALIN>
       LOG.warn("Invalid value for " + MYSQL_CHECKPOINT_BYTES_KEY);
       this.checkpointDistInBytes = DEFAULT_CHECKPOINT_BYTES;
     }
+
+    this.checkpointSleepMs = conf.getLong(
+        MYSQL_CHECKPOINT_SLEEP_KEY, DEFAULT_CHECKPOINT_SLEEP_MS);
+    if (this.checkpointDistInBytes < 0) {
+      LOG.warn("Invalid value for " + MYSQL_CHECKPOINT_SLEEP_KEY);
+      this.checkpointDistInBytes = DEFAULT_CHECKPOINT_SLEEP_MS;
+    }
   }

   /**
@@ -347,6 +366,12 @@ public class MySQLExportMapper<KEYIN, VALIN>
     if (this.checkpointDistInBytes != 0
         && this.bytesWritten > this.checkpointDistInBytes) {
       LOG.info("Checkpointing current export.");
+
+      if(this.checkpointSleepMs != 0) {
+        LOG.info("Pausing.");
+        Thread.sleep(this.checkpointSleepMs);
+      }
+
       closeExportHandles();
       initMySQLImportProcess();
       this.bytesWritten = 0;
{code}
                
      was (Author: tcz):
    diff --git a/src/java/org/apache/sqoop/mapreduce/MySQLExportMapper.java 
b/src/java/org/apache/sqoop/mapreduce/MySQLExportMapper.java
old mode 100644
new mode 100755
index a4e8b88..648c1e2
--- a/src/java/org/apache/sqoop/mapreduce/MySQLExportMapper.java
+++ b/src/java/org/apache/sqoop/mapreduce/MySQLExportMapper.java
@@ -64,6 +64,18 @@ public class MySQLExportMapper<KEYIN, VALIN>
   // Configured value for MSYQL_CHECKPOINT_BYTES_KEY.
   protected long checkpointDistInBytes;

+  /** Configuration key that specifies the number of milliseconds
+   * to sleep at the end of each checkpoint commit
+   * Default is 0, no sleep.
+   */
+  public static final String MYSQL_CHECKPOINT_SLEEP_KEY =
+      "sqoop.mysql.export.sleep.ms";
+
+  public static final long DEFAULT_CHECKPOINT_SLEEP_MS = 0;
+
+  // Configured value for MYSQL_CHECKPOINT_SLEEP_KEY.
+  protected long checkpointSleepMs;
+
   protected Configuration conf;

   /** The FIFO being used to communicate with mysqlimport. */
@@ -314,6 +326,13 @@ public class MySQLExportMapper<KEYIN, VALIN>
       LOG.warn("Invalid value for " + MYSQL_CHECKPOINT_BYTES_KEY);
       this.checkpointDistInBytes = DEFAULT_CHECKPOINT_BYTES;
     }
+
+    this.checkpointSleepMs = conf.getLong(
+        MYSQL_CHECKPOINT_SLEEP_KEY, DEFAULT_CHECKPOINT_SLEEP_MS);
+    if (this.checkpointDistInBytes < 0) {
+      LOG.warn("Invalid value for " + MYSQL_CHECKPOINT_SLEEP_KEY);
+      this.checkpointDistInBytes = DEFAULT_CHECKPOINT_SLEEP_MS;
+    }
   }

   /**
@@ -347,6 +366,12 @@ public class MySQLExportMapper<KEYIN, VALIN>
     if (this.checkpointDistInBytes != 0
         && this.bytesWritten > this.checkpointDistInBytes) {
       LOG.info("Checkpointing current export.");
+
+      if(this.checkpointSleepMs != 0) {
+        LOG.info("Pausing.");
+        Thread.sleep(this.checkpointSleepMs);
+      }
+
       closeExportHandles();
       initMySQLImportProcess();
       this.bytesWritten = 0;

                  
> Easy throttling feature for MySQL exports
> -----------------------------------------
>
>                 Key: SQOOP-604
>                 URL: https://issues.apache.org/jira/browse/SQOOP-604
>             Project: Sqoop
>          Issue Type: Improvement
>          Components: connectors/mysql
>    Affects Versions: 1.4.3
>            Reporter: Zoltan Toth-Czifra
>            Priority: Minor
>             Fix For: 1.4.3
>
>
> Sqoop always tries to achieve the best possible throughput with exports, 
> which might not be desirable in all cases. Sometimes we need to export large 
> data with Sqoop to a live relational database (MySQL in our case), that is, a 
> database that is under a high load serving random queries from the users of 
> our product.
> While data consistency issues during the export can be easily solved with a 
> staging table, there is still a problem: the performance impact caused by the 
> heavy export. 
> First off, the resources of MySQL dedicated to the import process can affect 
> the performance of the live product, both on the master and on the slaves. 
> Second, even if the servers can handle the import with no significant 
> performance impact (mysqlimport should be relatively "cheap"), importing big 
> tables (GB+) can cause serious replication lag in the cluster risking data 
> consistency.
> My suggestion is quite simple. Using the already existing "checkpoint" 
> feature of the MySQL exports (the export process is restarted every X bytes 
> written), extending it with a new config value that would simply make the 
> thread sleep for X milliseconds at the checkbpoints. With low enough byte 
> count limit this can be a simple yet powerful throttling mechanism.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to