[
https://issues.apache.org/jira/browse/SQOOP-604?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13456593#comment-13456593
]
Zoltan Toth-Czifra edited comment on SQOOP-604 at 9/17/12 3:26 AM:
-------------------------------------------------------------------
{code}
diff --git a/src/java/org/apache/sqoop/mapreduce/MySQLExportMapper.java
b/src/java/org/apache/sqoop/mapreduce/MySQLExportMapper.java
old mode 100644
new mode 100755
index a4e8b88..648c1e2
--- a/src/java/org/apache/sqoop/mapreduce/MySQLExportMapper.java
+++ b/src/java/org/apache/sqoop/mapreduce/MySQLExportMapper.java
@@ -64,6 +64,18 @@ public class MySQLExportMapper<KEYIN, VALIN>
// Configured value for MSYQL_CHECKPOINT_BYTES_KEY.
protected long checkpointDistInBytes;
+ /** Configuration key that specifies the number of milliseconds
+ * to sleep at the end of each checkpoint commit
+ * Default is 0, no sleep.
+ */
+ public static final String MYSQL_CHECKPOINT_SLEEP_KEY =
+ "sqoop.mysql.export.sleep.ms";
+
+ public static final long DEFAULT_CHECKPOINT_SLEEP_MS = 0;
+
+ // Configured value for MYSQL_CHECKPOINT_SLEEP_KEY.
+ protected long checkpointSleepMs;
+
protected Configuration conf;
/** The FIFO being used to communicate with mysqlimport. */
@@ -314,6 +326,13 @@ public class MySQLExportMapper<KEYIN, VALIN>
LOG.warn("Invalid value for " + MYSQL_CHECKPOINT_BYTES_KEY);
this.checkpointDistInBytes = DEFAULT_CHECKPOINT_BYTES;
}
+
+ this.checkpointSleepMs = conf.getLong(
+ MYSQL_CHECKPOINT_SLEEP_KEY, DEFAULT_CHECKPOINT_SLEEP_MS);
+ if (this.checkpointDistInBytes < 0) {
+ LOG.warn("Invalid value for " + MYSQL_CHECKPOINT_SLEEP_KEY);
+ this.checkpointDistInBytes = DEFAULT_CHECKPOINT_SLEEP_MS;
+ }
}
/**
@@ -347,6 +366,12 @@ public class MySQLExportMapper<KEYIN, VALIN>
if (this.checkpointDistInBytes != 0
&& this.bytesWritten > this.checkpointDistInBytes) {
LOG.info("Checkpointing current export.");
+
+ if(this.checkpointSleepMs != 0) {
+ LOG.info("Pausing.");
+ Thread.sleep(this.checkpointSleepMs);
+ }
+
closeExportHandles();
initMySQLImportProcess();
this.bytesWritten = 0;
{code}
was (Author: tcz):
diff --git a/src/java/org/apache/sqoop/mapreduce/MySQLExportMapper.java
b/src/java/org/apache/sqoop/mapreduce/MySQLExportMapper.java
old mode 100644
new mode 100755
index a4e8b88..648c1e2
--- a/src/java/org/apache/sqoop/mapreduce/MySQLExportMapper.java
+++ b/src/java/org/apache/sqoop/mapreduce/MySQLExportMapper.java
@@ -64,6 +64,18 @@ public class MySQLExportMapper<KEYIN, VALIN>
// Configured value for MSYQL_CHECKPOINT_BYTES_KEY.
protected long checkpointDistInBytes;
+ /** Configuration key that specifies the number of milliseconds
+ * to sleep at the end of each checkpoint commit
+ * Default is 0, no sleep.
+ */
+ public static final String MYSQL_CHECKPOINT_SLEEP_KEY =
+ "sqoop.mysql.export.sleep.ms";
+
+ public static final long DEFAULT_CHECKPOINT_SLEEP_MS = 0;
+
+ // Configured value for MYSQL_CHECKPOINT_SLEEP_KEY.
+ protected long checkpointSleepMs;
+
protected Configuration conf;
/** The FIFO being used to communicate with mysqlimport. */
@@ -314,6 +326,13 @@ public class MySQLExportMapper<KEYIN, VALIN>
LOG.warn("Invalid value for " + MYSQL_CHECKPOINT_BYTES_KEY);
this.checkpointDistInBytes = DEFAULT_CHECKPOINT_BYTES;
}
+
+ this.checkpointSleepMs = conf.getLong(
+ MYSQL_CHECKPOINT_SLEEP_KEY, DEFAULT_CHECKPOINT_SLEEP_MS);
+ if (this.checkpointDistInBytes < 0) {
+ LOG.warn("Invalid value for " + MYSQL_CHECKPOINT_SLEEP_KEY);
+ this.checkpointDistInBytes = DEFAULT_CHECKPOINT_SLEEP_MS;
+ }
}
/**
@@ -347,6 +366,12 @@ public class MySQLExportMapper<KEYIN, VALIN>
if (this.checkpointDistInBytes != 0
&& this.bytesWritten > this.checkpointDistInBytes) {
LOG.info("Checkpointing current export.");
+
+ if(this.checkpointSleepMs != 0) {
+ LOG.info("Pausing.");
+ Thread.sleep(this.checkpointSleepMs);
+ }
+
closeExportHandles();
initMySQLImportProcess();
this.bytesWritten = 0;
> Easy throttling feature for MySQL exports
> -----------------------------------------
>
> Key: SQOOP-604
> URL: https://issues.apache.org/jira/browse/SQOOP-604
> Project: Sqoop
> Issue Type: Improvement
> Components: connectors/mysql
> Affects Versions: 1.4.3
> Reporter: Zoltan Toth-Czifra
> Priority: Minor
> Fix For: 1.4.3
>
>
> Sqoop always tries to achieve the best possible throughput with exports,
> which might not be desirable in all cases. Sometimes we need to export large
> data with Sqoop to a live relational database (MySQL in our case), that is, a
> database that is under a high load serving random queries from the users of
> our product.
> While data consistency issues during the export can be easily solved with a
> staging table, there is still a problem: the performance impact caused by the
> heavy export.
> First off, the resources of MySQL dedicated to the import process can affect
> the performance of the live product, both on the master and on the slaves.
> Second, even if the servers can handle the import with no significant
> performance impact (mysqlimport should be relatively "cheap"), importing big
> tables (GB+) can cause serious replication lag in the cluster risking data
> consistency.
> My suggestion is quite simple. Using the already existing "checkpoint"
> feature of the MySQL exports (the export process is restarted every X bytes
> written), extending it with a new config value that would simply make the
> thread sleep for X milliseconds at the checkbpoints. With low enough byte
> count limit this can be a simple yet powerful throttling mechanism.
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira