This is an automated email from the ASF dual-hosted git repository.
tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git
The following commit(s) were added to refs/heads/main by this push:
new 28941613f TIKA-4234 (#1708)
28941613f is described below
commit 28941613fe02622fba79a7544db1270b3dabac92
Author: Tim Allison <[email protected]>
AuthorDate: Thu Apr 4 10:31:46 2024 -0400
TIKA-4234 (#1708)
* TIKA_4234 -- improve jdbc reporter
---
.../pipes/reporters/jdbc/JDBCPipesReporter.java | 126 +++++++++++++++++++--
.../reporters/jdbc/TestJDBCPipesReporter.java | 33 ++++++
.../resources/configs/tika-config-advanced.xml | 50 ++++++++
3 files changed, 199 insertions(+), 10 deletions(-)
diff --git
a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java
b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java
index ee52bf80f..a1ffc8b15 100644
---
a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java
+++
b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java
@@ -69,6 +69,14 @@ public class JDBCPipesReporter extends PipesReporterBase
implements Initializabl
private String connectionString;
+ private boolean createTable = true;
+
+ private String tableName = TABLE_NAME;
+
+ private String reportSql;
+
+ private List<String> reportVariables;
+
private Optional<String> postConnectionString = Optional.empty();
private final ArrayBlockingQueue<IdStatusPair> queue =
new ArrayBlockingQueue(ARRAY_BLOCKING_QUEUE_SIZE);
@@ -80,6 +88,15 @@ public class JDBCPipesReporter extends PipesReporterBase
implements Initializabl
if (StringUtils.isBlank(connectionString)) {
throw new TikaConfigException("Must specify a connectionString");
}
+ if (reportVariables == null) {
+ reportVariables = new ArrayList<>();
+ reportVariables.add("id");
+ reportVariables.add("status");
+ reportVariables.add("timestamp");
+ }
+ if (reportSql == null) {
+ reportSql = "insert into " + getTableName() + " (id, status,
timestamp) values (?,?,?)";
+ }
ReportWorker reportWorker = new ReportWorker(connectionString,
postConnectionString,
queue, cacheSize, reportWithinMs);
reportWorker.init();
@@ -113,6 +130,76 @@ public class JDBCPipesReporter extends PipesReporterBase
implements Initializabl
this.cacheSize = cacheSize;
}
+ /**
+ * The default is true. In a distributed setting with multiple
+ * servers, this should be set to false, and you'll need to set up
+ * the table on your own.
+ * <p/>
+ * <b>NOTE</b> The default behavior is to drop the table if it exists and
+ * then create it. Make sure to set this to false if you do not want
+ * to drop the table.
+ * @param createTable
+ */
+ @Field
+ public void setCreateTable(boolean createTable) {
+ this.createTable = createTable;
+ }
+
+ /**
+ * The default is {@link JDBCPipesReporter#TABLE_NAME}
+ * @param tableName
+ */
+ @Field
+ public void setTableName(String tableName) {
+ this.tableName = tableName;
+ }
+
+ /**
+ * This is the sql for the prepared statement to execute
+ * to store the report record. the default is:
+ * <code>insert into tika_status (id, status, timestamp) values
(?,?,?)</code>
+ *
+ * This can be modified for specific dialects of SQL or to run an upsert,
merge or update
+ * instead of the default insert.
+ *
+ * Users need to coordinate this with {@link #setReportVariables(List)}
+ * @param reportSql
+ */
+ @Field
+ public void setReportSql(String reportSql) {
+ this.reportSql = reportSql;
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public List<String> getReportVariables() {
+ return reportVariables;
+ }
+
+ public String getReportSql() {
+ return reportSql;
+ }
+
+ public boolean isCreateTable() {
+ return createTable;
+ }
+ /**
+ * ADVANCED: This is used to set the variables in the prepared statement
for
+ * the report. This needs to be coordinated with {@link
#setReportSql(String)}.
+ * The available variables are "id, status, timestamp". If you're
modifying to an update
+ * statement like "update table tika_status set status=?, timestamp=?
where id = ?"
+ * then the values for this would be ["status", "timestamp", "id"].
+ * <p/>
+ * The default for the insert is ["id", "status", "timestamp"]
+ * @param variables
+ */
+
+ @Field
+ public void setReportVariables(List<String> variables) {
+ reportVariables = variables;
+ }
/**
* Commit the reports if the amount of time elapsed since the last report
commit
@@ -205,7 +292,7 @@ public class JDBCPipesReporter extends PipesReporterBase
implements Initializabl
}
}
- private static class ReportWorker implements Runnable {
+ private class ReportWorker implements Runnable {
private static final int MAX_TRIES = 3;
private final String connectionString;
@@ -233,7 +320,10 @@ public class JDBCPipesReporter extends PipesReporterBase
implements Initializabl
public void init() throws TikaConfigException {
try {
createConnection();
- createTable();
+ if (isCreateTable()) {
+ createTable();
+ }
+ //table must exist for this to work
createPreparedStatement();
} catch (SQLException e) {
throw new TikaConfigException("Problem creating connection,
etc", e);
@@ -301,9 +391,7 @@ public class JDBCPipesReporter extends PipesReporterBase
implements Initializabl
try {
for (IdStatusPair p : cache) {
insert.clearParameters();
- insert.setString(1, p.id);
- insert.setString(2, p.status.name());
- insert.setTimestamp(3, Timestamp.from(Instant.now()));
+ updateInsert(insert, p.id, p.status.name(),
Timestamp.from(Instant.now()));
insert.addBatch();
}
insert.executeBatch();
@@ -317,11 +405,31 @@ public class JDBCPipesReporter extends PipesReporterBase
implements Initializabl
}
}
+ private void updateInsert(PreparedStatement insert, String id,
+ String status,
+ Timestamp timestamp) throws SQLException {
+ //there has to be a more efficient way than this
+ for (int i = 0; i < reportVariables.size(); i++) {
+ String name = reportVariables.get(i);
+ if (name.equals("timestamp")) {
+ insert.setTimestamp(i + 1, timestamp);
+ } else if (name.equals("id")) {
+ insert.setString(i + 1, id);
+ } else if (name.equals("status")) {
+ insert.setString(i + 1, status);
+ } else {
+ throw new IllegalArgumentException("I expected one of (id,
status, timestamp)" +
+ ", but I got: " + name);
+ }
+ }
+
+ }
+
private void createTable() throws SQLException {
try (Statement st = connection.createStatement()) {
- String sql = "drop table if exists " + TABLE_NAME;
+ String sql = "drop table if exists " + getTableName();
st.execute(sql);
- sql = "create table " + TABLE_NAME + " (id varchar(1024),
status varchar(32), " +
+ sql = "create table " + getTableName() + " (id varchar(1024),
status varchar(32), " +
"timestamp timestamp with time zone)";
st.execute(sql);
}
@@ -375,9 +483,7 @@ public class JDBCPipesReporter extends PipesReporterBase
implements Initializabl
}
private void createPreparedStatement() throws SQLException {
- //do we want to do an upsert?
- String sql = "insert into " + TABLE_NAME + " (id, status,
timestamp) values (?,?,?)";
- insert = connection.prepareStatement(sql);
+ insert = connection.prepareStatement(getReportSql());
}
}
diff --git
a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/java/org/apache/tika/pipes/reporters/jdbc/TestJDBCPipesReporter.java
b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/java/org/apache/tika/pipes/reporters/jdbc/TestJDBCPipesReporter.java
index c16816c30..01d903c5e 100644
---
a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/java/org/apache/tika/pipes/reporters/jdbc/TestJDBCPipesReporter.java
+++
b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/java/org/apache/tika/pipes/reporters/jdbc/TestJDBCPipesReporter.java
@@ -147,6 +147,39 @@ public class TestJDBCPipesReporter {
assertEquals(numThreads * numIterations, sum);
}
+ @Test
+ public void testAdvanced(@TempDir Path tmpDir) throws Exception {
+ //this only tests configuration. we should add an actual unit test
+ Files.createDirectories(tmpDir.resolve("db"));
+ Path dbDir = tmpDir.resolve("db/h2");
+ Path config = tmpDir.resolve("tika-config.xml");
+ String connectionString = "jdbc:h2:file:" + dbDir.toAbsolutePath();
+
+ writeConfig("/configs/tika-config-advanced.xml",
+ connectionString, config);
+
+ //build the table outside of the reporter -- we set createTable=false
+ try (Connection connection =
DriverManager.getConnection(connectionString)) {
+ try (Statement st = connection.createStatement()) {
+ st.execute("create table my_tika_status (id varchar(256),
status varchar" +
+ "(256), timestamp timestamp with time zone)");
+ }
+ }
+
+ AsyncConfig asyncConfig = AsyncConfig.load(config);
+ JDBCPipesReporter reporter =
(JDBCPipesReporter)asyncConfig.getPipesReporter();
+ assertEquals("update my_tika_status set status=?, timestamp=? where
id=?",
+ reporter.getReportSql());
+ assertFalse(reporter.isCreateTable());
+
+ List<String> expected = new ArrayList<>();
+ expected.add("status");
+ expected.add("timestamp");
+ expected.add("id");
+
+ assertEquals(expected, reporter.getReportVariables());
+ }
+
private Map<PipesResult.STATUS, Long> countReported(String
connectionString) throws
SQLException {
diff --git
a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/configs/tika-config-advanced.xml
b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/configs/tika-config-advanced.xml
new file mode 100644
index 000000000..e3cf102e2
--- /dev/null
+++
b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/configs/tika-config-advanced.xml
@@ -0,0 +1,50 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<properties>
+ <async>
+ <maxForEmitBatchBytes>10000</maxForEmitBatchBytes>
+ <emitMaxEstimatedBytes>100000</emitMaxEstimatedBytes>
+ <emitWithinMillis>60000</emitWithinMillis>
+ <numEmitters>1</numEmitters>
+ <numClients>3</numClients>
+ <tikaConfig>{TIKA_CONFIG}</tikaConfig>
+ <forkedJvmArgs>
+ <arg>-Xmx512m</arg>
+ <arg>-XX:ParallelGCThreads=2</arg>
+ <arg>-Dlog4j.configurationFile={LOG4J_PROPERTIES_FILE}</arg>
+ </forkedJvmArgs>
+ <timeoutMillis>60000</timeoutMillis>
+ <pipesReporter
class="org.apache.tika.pipes.reporters.jdbc.JDBCPipesReporter">
+ <connection>CONNECTION_STRING</connection>
+ <includes>
+ <include>PARSE_SUCCESS</include>
+ <include>PARSE_SUCCESS_WITH_EXCEPTION</include>
+ </includes>
+ <tableName>my_tika_status</tableName>
+ <createTable>false</createTable>
+ <reportSql>update my_tika_status set status=?, timestamp=? where
id=?</reportSql>
+ <reportVariables>
+ <v>status</v>
+ <v>timestamp</v>
+ <v>id</v>
+ </reportVariables>
+ </pipesReporter>
+ </async>
+</properties>