This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tika.git


The following commit(s) were added to refs/heads/main by this push:
     new 28941613f TIKA-4234 (#1708)
28941613f is described below

commit 28941613fe02622fba79a7544db1270b3dabac92
Author: Tim Allison <[email protected]>
AuthorDate: Thu Apr 4 10:31:46 2024 -0400

    TIKA-4234 (#1708)
    
    * TIKA_4234 -- improve jdbc reporter
---
 .../pipes/reporters/jdbc/JDBCPipesReporter.java    | 126 +++++++++++++++++++--
 .../reporters/jdbc/TestJDBCPipesReporter.java      |  33 ++++++
 .../resources/configs/tika-config-advanced.xml     |  50 ++++++++
 3 files changed, 199 insertions(+), 10 deletions(-)

diff --git 
a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java
 
b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java
index ee52bf80f..a1ffc8b15 100644
--- 
a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java
+++ 
b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java
@@ -69,6 +69,14 @@ public class JDBCPipesReporter extends PipesReporterBase 
implements Initializabl
 
     private String connectionString;
 
+    private boolean createTable = true;
+
+    private String tableName = TABLE_NAME;
+
+    private String reportSql;
+
+    private List<String> reportVariables;
+
     private Optional<String> postConnectionString = Optional.empty();
     private final ArrayBlockingQueue<IdStatusPair> queue =
             new ArrayBlockingQueue(ARRAY_BLOCKING_QUEUE_SIZE);
@@ -80,6 +88,15 @@ public class JDBCPipesReporter extends PipesReporterBase 
implements Initializabl
         if (StringUtils.isBlank(connectionString)) {
             throw new TikaConfigException("Must specify a connectionString");
         }
+        if (reportVariables == null) {
+            reportVariables = new ArrayList<>();
+            reportVariables.add("id");
+            reportVariables.add("status");
+            reportVariables.add("timestamp");
+        }
+        if (reportSql == null) {
+            reportSql = "insert into " + getTableName() + " (id, status, 
timestamp) values (?,?,?)";
+        }
         ReportWorker reportWorker = new ReportWorker(connectionString, 
postConnectionString,
                 queue, cacheSize, reportWithinMs);
         reportWorker.init();
@@ -113,6 +130,76 @@ public class JDBCPipesReporter extends PipesReporterBase 
implements Initializabl
         this.cacheSize = cacheSize;
     }
 
+    /**
+     * The default is true. In a distributed setting with multiple
+     * servers, this should be set to false, and you'll need to set up
+     * the table on your own.
+     * <p/>
+     * <b>NOTE</b> The default behavior is to drop the table if it exists and
+     * then create it. Make sure to set this to false if you do not want
+     * to drop the table.
+     * @param createTable
+     */
+    @Field
+    public void setCreateTable(boolean createTable) {
+        this.createTable = createTable;
+    }
+
+    /**
+     * The default is {@link JDBCPipesReporter#TABLE_NAME}
+     * @param tableName
+     */
+    @Field
+    public void setTableName(String tableName) {
+        this.tableName = tableName;
+    }
+
+    /**
+     * This is the sql for the prepared statement to execute
+     * to store the report record. the default is:
+     * <code>insert into tika_status (id, status, timestamp) values 
(?,?,?)</code>
+     *
+     * This can be modified for specific dialects of SQL or to run an upsert, 
merge or update
+     * instead of the default insert.
+     *
+     * Users need to coordinate this with {@link #setReportVariables(List)}
+     * @param reportSql
+     */
+    @Field
+    public void setReportSql(String reportSql) {
+        this.reportSql = reportSql;
+    }
+
+    public String getTableName() {
+        return tableName;
+    }
+
+    public List<String> getReportVariables() {
+        return reportVariables;
+    }
+
+    public String getReportSql() {
+        return reportSql;
+    }
+
+    public boolean isCreateTable() {
+        return createTable;
+    }
+    /**
+     * ADVANCED: This is used to set the variables in the prepared statement 
for
+     * the report. This needs to be coordinated with {@link 
#setReportSql(String)}.
+     * The available variables are "id, status, timestamp". If you're 
modifying to an update
+     * statement like "update table tika_status set status=?, timestamp=? 
where id = ?"
+     * then the values for this would be ["status", "timestamp", "id"].
+     * <p/>
+     * The default for the insert is ["id", "status", "timestamp"]
+     * @param variables
+     */
+
+    @Field
+    public void setReportVariables(List<String> variables) {
+        reportVariables = variables;
+    }
 
     /**
      * Commit the reports if the amount of time elapsed since the last report 
commit
@@ -205,7 +292,7 @@ public class JDBCPipesReporter extends PipesReporterBase 
implements Initializabl
         }
     }
 
-    private static class ReportWorker implements Runnable {
+    private class ReportWorker implements Runnable {
 
         private static final int MAX_TRIES = 3;
         private final String connectionString;
@@ -233,7 +320,10 @@ public class JDBCPipesReporter extends PipesReporterBase 
implements Initializabl
         public void init() throws TikaConfigException {
             try {
                 createConnection();
-                createTable();
+                if (isCreateTable()) {
+                    createTable();
+                }
+                //table must exist for this to work
                 createPreparedStatement();
             } catch (SQLException e) {
                 throw new TikaConfigException("Problem creating connection, 
etc", e);
@@ -301,9 +391,7 @@ public class JDBCPipesReporter extends PipesReporterBase 
implements Initializabl
                 try {
                     for (IdStatusPair p : cache) {
                         insert.clearParameters();
-                        insert.setString(1, p.id);
-                        insert.setString(2, p.status.name());
-                        insert.setTimestamp(3, Timestamp.from(Instant.now()));
+                        updateInsert(insert, p.id, p.status.name(), 
Timestamp.from(Instant.now()));
                         insert.addBatch();
                     }
                     insert.executeBatch();
@@ -317,11 +405,31 @@ public class JDBCPipesReporter extends PipesReporterBase 
implements Initializabl
             }
         }
 
+        private void updateInsert(PreparedStatement insert, String id,
+                                  String status,
+                                  Timestamp timestamp) throws SQLException {
+            //there has to be a more efficient way than this
+            for (int i = 0; i < reportVariables.size(); i++) {
+                String name = reportVariables.get(i);
+                if (name.equals("timestamp")) {
+                    insert.setTimestamp(i + 1, timestamp);
+                } else if (name.equals("id")) {
+                    insert.setString(i + 1, id);
+                } else if (name.equals("status")) {
+                    insert.setString(i + 1, status);
+                } else {
+                    throw new IllegalArgumentException("I expected one of (id, 
status, timestamp)" +
+                            ", but I got: " + name);
+                }
+            }
+
+        }
+
         private void createTable() throws SQLException {
             try (Statement st = connection.createStatement()) {
-                String sql = "drop table if exists " + TABLE_NAME;
+                String sql = "drop table if exists " + getTableName();
                 st.execute(sql);
-                sql = "create table " + TABLE_NAME + " (id varchar(1024), 
status varchar(32), " +
+                sql = "create table " + getTableName() + " (id varchar(1024), 
status varchar(32), " +
                         "timestamp timestamp with time zone)";
                 st.execute(sql);
             }
@@ -375,9 +483,7 @@ public class JDBCPipesReporter extends PipesReporterBase 
implements Initializabl
         }
 
         private void createPreparedStatement() throws SQLException {
-            //do we want to do an upsert?
-            String sql = "insert into " + TABLE_NAME + " (id, status, 
timestamp) values (?,?,?)";
-            insert = connection.prepareStatement(sql);
+            insert = connection.prepareStatement(getReportSql());
         }
     }
 
diff --git 
a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/java/org/apache/tika/pipes/reporters/jdbc/TestJDBCPipesReporter.java
 
b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/java/org/apache/tika/pipes/reporters/jdbc/TestJDBCPipesReporter.java
index c16816c30..01d903c5e 100644
--- 
a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/java/org/apache/tika/pipes/reporters/jdbc/TestJDBCPipesReporter.java
+++ 
b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/java/org/apache/tika/pipes/reporters/jdbc/TestJDBCPipesReporter.java
@@ -147,6 +147,39 @@ public class TestJDBCPipesReporter {
         assertEquals(numThreads * numIterations, sum);
     }
 
+    @Test
+    public void testAdvanced(@TempDir Path tmpDir) throws Exception {
+        //this only tests configuration. we should add an actual unit test
+        Files.createDirectories(tmpDir.resolve("db"));
+        Path dbDir = tmpDir.resolve("db/h2");
+        Path config = tmpDir.resolve("tika-config.xml");
+        String connectionString = "jdbc:h2:file:" + dbDir.toAbsolutePath();
+
+        writeConfig("/configs/tika-config-advanced.xml",
+                connectionString, config);
+
+        //build the table outside of the reporter -- we set createTable=false
+        try (Connection connection = 
DriverManager.getConnection(connectionString)) {
+            try (Statement st = connection.createStatement()) {
+                st.execute("create table my_tika_status (id varchar(256), 
status varchar" +
+                        "(256), timestamp timestamp with time zone)");
+            }
+        }
+
+        AsyncConfig asyncConfig = AsyncConfig.load(config);
+        JDBCPipesReporter reporter = 
(JDBCPipesReporter)asyncConfig.getPipesReporter();
+        assertEquals("update my_tika_status set status=?, timestamp=? where 
id=?",
+                reporter.getReportSql());
+        assertFalse(reporter.isCreateTable());
+
+        List<String> expected = new ArrayList<>();
+        expected.add("status");
+        expected.add("timestamp");
+        expected.add("id");
+
+        assertEquals(expected, reporter.getReportVariables());
+    }
+
 
     private Map<PipesResult.STATUS, Long> countReported(String 
connectionString) throws
             SQLException {
diff --git 
a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/configs/tika-config-advanced.xml
 
b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/configs/tika-config-advanced.xml
new file mode 100644
index 000000000..e3cf102e2
--- /dev/null
+++ 
b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/configs/tika-config-advanced.xml
@@ -0,0 +1,50 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<properties>
+  <async>
+    <maxForEmitBatchBytes>10000</maxForEmitBatchBytes>
+    <emitMaxEstimatedBytes>100000</emitMaxEstimatedBytes>
+    <emitWithinMillis>60000</emitWithinMillis>
+    <numEmitters>1</numEmitters>
+    <numClients>3</numClients>
+    <tikaConfig>{TIKA_CONFIG}</tikaConfig>
+    <forkedJvmArgs>
+      <arg>-Xmx512m</arg>
+      <arg>-XX:ParallelGCThreads=2</arg>
+      <arg>-Dlog4j.configurationFile={LOG4J_PROPERTIES_FILE}</arg>
+    </forkedJvmArgs>
+    <timeoutMillis>60000</timeoutMillis>
+    <pipesReporter 
class="org.apache.tika.pipes.reporters.jdbc.JDBCPipesReporter">
+      <connection>CONNECTION_STRING</connection>
+      <includes>
+        <include>PARSE_SUCCESS</include>
+        <include>PARSE_SUCCESS_WITH_EXCEPTION</include>
+      </includes>
+      <tableName>my_tika_status</tableName>
+      <createTable>false</createTable>
+      <reportSql>update my_tika_status set status=?, timestamp=? where 
id=?</reportSql>
+      <reportVariables>
+        <v>status</v>
+        <v>timestamp</v>
+        <v>id</v>
+      </reportVariables>
+    </pipesReporter>
+  </async>
+</properties>

Reply via email to