This is an automated email from the ASF dual-hosted git repository. tallison pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/tika.git
commit 43f06c904fcdb01244ba4c90fdbbfb881e92e0f6 Author: tallison <[email protected]> AuthorDate: Wed Apr 8 10:46:59 2020 -0400 TIKA-3085 -- switch to batch inserts in tika-eval --- .../java/org/apache/tika/eval/db/JDBCUtil.java | 30 +++++++++++++++++++ .../java/org/apache/tika/eval/io/DBWriter.java | 35 +++++++++++++++++----- 2 files changed, 58 insertions(+), 7 deletions(-) diff --git a/tika-eval/src/main/java/org/apache/tika/eval/db/JDBCUtil.java b/tika-eval/src/main/java/org/apache/tika/eval/db/JDBCUtil.java index 794c55b..5c3e427 100644 --- a/tika-eval/src/main/java/org/apache/tika/eval/db/JDBCUtil.java +++ b/tika-eval/src/main/java/org/apache/tika/eval/db/JDBCUtil.java @@ -140,6 +140,10 @@ public class JDBCUtil { return tables; } + @Deprecated + /** + * @deprecated use {@link #batchInsert(PreparedStatement, TableInfo, Map)} + */ public static int insert(PreparedStatement insertStatement, TableInfo table, Map<Cols, String> data) throws SQLException { @@ -165,6 +169,28 @@ public class JDBCUtil { } } + public static void batchInsert(PreparedStatement insertStatement, + TableInfo table, + Map<Cols, String> data) throws SQLException { + + try { + int i = 1; + for (ColInfo colInfo : table.getColInfos()) { + updateInsertStatement(i, insertStatement, colInfo, data.get(colInfo.getName())); + i++; + } + for (Cols c : data.keySet()) { + if (!table.containsColumn(c)) { + throw new IllegalArgumentException("Can't add data to " + c + + " because it doesn't exist in the table: " + table.getName()); + } + } + insertStatement.addBatch(); + } catch (SQLException e) { + LOG.warn("couldn't insert data for this row: {}", e.getMessage()); + } + } + public static void updateInsertStatement(int dbColOffset, PreparedStatement st, ColInfo colInfo, String value) throws SQLException { if (value == null) { @@ -178,9 +204,13 @@ public class JDBCUtil { value = value.substring(0, colInfo.getPrecision()); LOG.warn("truncated varchar value in {} : {}", colInfo.getName(), value); } + //postgres doesn't allow \0000 + value = value.replaceAll("\u0000", " "); st.setString(dbColOffset, value); break; case Types.CHAR: + //postgres doesn't allow \0000 + value = value.replaceAll("\u0000", " "); st.setString(dbColOffset, value); break; case Types.DOUBLE: diff --git a/tika-eval/src/main/java/org/apache/tika/eval/io/DBWriter.java b/tika-eval/src/main/java/org/apache/tika/eval/io/DBWriter.java index 8aea3cd..909727a 100644 --- a/tika-eval/src/main/java/org/apache/tika/eval/io/DBWriter.java +++ b/tika-eval/src/main/java/org/apache/tika/eval/io/DBWriter.java @@ -50,8 +50,8 @@ public class DBWriter implements IDBWriter { private static final Logger LOG = LoggerFactory.getLogger(DBWriter.class); private static final AtomicInteger WRITER_ID = new AtomicInteger(); - private final AtomicLong insertedRows = new AtomicLong(); - private final Long commitEveryX = 1000L; + private final Long commitEveryXRows = 10000L; + //private final Long commitEveryXMS = 60000L; private final Connection conn; private final JDBCUtil dbUtil; @@ -60,7 +60,7 @@ public class DBWriter implements IDBWriter { //<tableName, preparedStatement> private final Map<String, PreparedStatement> inserts = new HashMap<>(); - + private final Map<String, LastInsert> lastInsertMap = new HashMap<>(); public DBWriter(Connection connection, List<TableInfo> tableInfos, JDBCUtil dbUtil, MimeBuffer mimeBuffer) throws IOException, SQLException { @@ -71,6 +71,7 @@ public class DBWriter implements IDBWriter { try { PreparedStatement st = createPreparedInsert(tableInfo); inserts.put(tableInfo.getName(), st); + lastInsertMap.put(tableInfo.getName(), new LastInsert()); } catch (SQLException e) { throw new RuntimeException(e); } @@ -115,11 +116,19 @@ public class DBWriter implements IDBWriter { throw new RuntimeException("Failed to create prepared statement for: "+ table.getName()); } - dbUtil.insert(p, table, data); - long rows = insertedRows.incrementAndGet(); - if (rows % commitEveryX == 0) { - LOG.debug("writer ({}) is committing after {} rows", myId, rows); + dbUtil.batchInsert(p, table, data); + LastInsert lastInsert = lastInsertMap.get(table.getName()); + lastInsert.rowCount++; + long elapsed = System.currentTimeMillis()-lastInsert.lastInsert; + if ( + //elapsed > commitEveryXMS || + lastInsert.rowCount % commitEveryXRows == 0) { + LOG.info("writer ({}) on table ({}) is committing after {} rows and {} ms", myId, + table.getName(), + lastInsert.rowCount, elapsed); + p.executeBatch(); conn.commit(); + lastInsert.lastInsert = System.currentTimeMillis(); } } catch (SQLException e) { throw new IOException(e); @@ -127,6 +136,13 @@ public class DBWriter implements IDBWriter { } public void close() throws IOException { + for (PreparedStatement p : inserts.values()) { + try { + p.executeBatch(); + } catch (SQLException e) { + throw new IOExceptionWithCause(e); + } + } try { conn.commit(); } catch (SQLException e){ @@ -139,4 +155,9 @@ public class DBWriter implements IDBWriter { } } + + private class LastInsert { + private long lastInsert = System.currentTimeMillis(); + private long rowCount = 0; + } }
