1. Project Clover database Wed Nov 12 2025 05:07:35 UTC
  2. Package guru.mikelue.foxglove.jdbc

File JdbcTxWorker.java

 

Coverage histogram

../../../../img/srcFileCovDistChart9.png
53% of files have more coverage

Code metrics

24
75
12
5
293
210
30
0.4
6.25
2.4
2.5

Classes

Class Line # Actions
JdbcTxWorker 24 34 0% 14 1
0.981481598.1%
JdbcTxWorker.InsertionContext 28 0 - 0 0
-1.0 -
BatchWorker 157 13 0% 4 2
0.894736889.5%
PluralBatchWorker 202 17 0% 7 0
1.0100%
SingleBatchWorker 255 11 0% 5 14
0.00%
 

Contributing tests

This file is covered by 47 tests. .

Source view

1    package guru.mikelue.foxglove.jdbc;
2   
3    import java.sql.Connection;
4    import java.sql.JDBCType;
5    import java.sql.PreparedStatement;
6    import java.sql.SQLException;
7    import java.util.List;
8    import java.util.Map;
9    import java.util.function.Consumer;
10    import java.util.function.Supplier;
11    import java.util.regex.Pattern;
12   
13    import org.slf4j.Logger;
14    import org.slf4j.LoggerFactory;
15   
16    import guru.mikelue.foxglove.ColumnMeta;
17    import guru.mikelue.foxglove.TupleAccessor;
18   
19    import static java.sql.Statement.RETURN_GENERATED_KEYS;
20   
21    /**
22    * This worker is responsible for performing JDBC transaction for insertion of generated rows.
23    */
 
24    class JdbcTxWorker implements AutoCloseable {
25    /**
26    * The context make JdbcTxWorker easier to be tested.
27    */
 
28    record InsertionContext (
29    String sql, int numberOfRows,
30    String[] namesOfGeneratedColumns,
31    Supplier<Map<ColumnMeta, Object>> rowParamsGenerator
32    ) {}
33   
34    private Logger logger = LoggerFactory.getLogger(JdbcTxWorker.class);
35   
36    private final Connection conn;
37    private final TransactionGear txGear;
38    private final boolean oldAutoCommit;
39   
 
40  70 toggle JdbcTxWorker(
41    TransactionGear txGear
42    ) throws SQLException {
43  70 this.txGear = txGear;
44  70 this.conn = txGear.connection();
45   
46  70 oldAutoCommit = conn.getAutoCommit();
47   
48  70 if (!txGear.joinConnection()) {
49  60 conn.setAutoCommit(false);
50    }
51    }
52   
53    private int unCommittedNumberOfRows = 0;
54   
 
55  70 toggle @Override
56    public void close() throws SQLException
57    {
58  70 logger.debug("Closing JDBC transaction worker");
59  70 commitIfNeeded();
60   
61  70 if (!txGear.joinConnection()) {
62  60 conn.setAutoCommit(oldAutoCommit);
63    }
64    }
65   
66    /**
67    * This is stateful method, which would keep track of uncommitted rows.
68    *
69    * The {@link #close()} method would commit remaining uncommitted rows
70    * if joinConnection is false.
71    */
 
72  108 toggle int performInsert(
73    InsertionContext context,
74    Consumer<List<TupleAccessor>> generatedValuesConsumer
75    ) throws SQLException
76    {
77  108 var insertSql = context.sql();
78  108 var namesOfGeneratedColumns = context.namesOfGeneratedColumns();
79   
80  108 logger.debug(
81    "Going to insert [{}] rows by SQL:\n\t{}",
82    context.numberOfRows(), insertSql
83    );
84   
85  108 try (var stmt = namesOfGeneratedColumns.length > 0 ?
86    conn.prepareStatement(insertSql, namesOfGeneratedColumns) :
87    conn.prepareStatement(insertSql, RETURN_GENERATED_KEYS)
88    ) {
89  108 var batchWorker = BatchWorker.newInstance(
90    conn.getMetaData().getDriverName(),
91    stmt, namesOfGeneratedColumns, generatedValuesConsumer,
92    txGear.batchSize()
93    );
94  108 var rowParamsGenerator = context.rowParamsGenerator();
95   
96  1705 for (int rowIndex = 0; rowIndex < context.numberOfRows(); rowIndex++) {
97  1597 var valuesOfRow = rowParamsGenerator.get();
98   
99  1597 if (logger.isDebugEnabled()) {
100  1597 logger.trace("Preparing row[{}] for insertion: {}",
101    rowIndex, valuesOfRow.values());
102    }
103   
104  1597 batchWorker.addBatch(valuesOfRow);
105   
106  1597 unCommittedNumberOfRows++;
107  1597 if (unCommittedNumberOfRows >= txGear.batchSize()) {
108  32 commitIfNeeded();
109    }
110    }
111   
112    /*
113    * If there are existing statements not executed, execute them here.
114    */
115  108 batchWorker.executeBatch();
116    // :~)
117    }
118   
119  108 return context.numberOfRows();
120    }
121   
 
122  102 toggle private void commitIfNeeded() throws SQLException
123    {
124  102 if (txGear.joinConnection()) {
125  26 logger.debug("Skip committing [{}] remaining statements because of joining existing transaction.", unCommittedNumberOfRows);
126  26 unCommittedNumberOfRows = 0;
127  26 return;
128    }
129   
130  76 if (unCommittedNumberOfRows > 0) {
131  74 logger.debug("Committing [{}] remaining statements of batch[{}].", unCommittedNumberOfRows, txGear.batchSize());
132  74 conn.commit();
133  74 unCommittedNumberOfRows = 0;
134    }
135    }
136    }
137   
138    /**
139    * Since {@link PreparedStatement#getGeneratedKeys()} behaves differently
140    * across different databases, this interface is used to implement different strategy.
141    *
142    * <ul>
143    * <li>Derby, SQLite - Only get generated value of latest inserted row</li>
144    * <li>MSSQL - Only able to get value by {@link PreparedStatement#executeUpdate()}</li>
145    * <li>Otherwise, gets the generated values by {@link PreparedStatement#executeBatch}</li>
146    * </ul>
147    *
148    * Microsoft JDBC Driver 11.2 for SQL Server
149    * Apache Derby Embedded JDBC Driver
150    * SQLite JDBC
151    *
152    * HSQL Database Engine Driver
153    * PostgreSQL JDBC Driver
154    * MySQL Connector/J
155    * Oracle JDBC driver
156    */
 
157    interface BatchWorker {
158    static Pattern DRIVER_FOR_SINGLE_WORKER = Pattern.compile(
159    "(?i).*(derby|sqlite|microsoft).*"
160    );
161   
 
162  108 toggle static BatchWorker newInstance(
163    String dbDriverName,
164    PreparedStatement stmt, String[] askedGeneratedColumns,
165    Consumer<List<TupleAccessor>> generatedValuesConsumer,
166    int batchSize
167    ) {
168  108 if (DRIVER_FOR_SINGLE_WORKER.matcher(dbDriverName).matches()) {
169  0 return new SingleBatchWorker(
170    stmt, askedGeneratedColumns, generatedValuesConsumer
171    );
172    }
173   
174  108 return new PluralBatchWorker(
175    stmt, askedGeneratedColumns, generatedValuesConsumer, batchSize
176    );
177    }
178   
179    void addBatch(Map<ColumnMeta, Object> paramSet) throws SQLException;
180    void executeBatch() throws SQLException;
181   
 
182  1597 toggle static void setParams(
183    PreparedStatement stmt, Map<ColumnMeta, Object> paramSet
184    ) throws SQLException {
185  1597 int paramIndex = 1;
186  1597 for (ColumnMeta columnMeta: paramSet.keySet()) {
187  12032 Object value = paramSet.get(columnMeta);
188  12032 JDBCType jdbcType = columnMeta.jdbcType();
189   
190  12032 if (value == null) {
191  99 stmt.setNull(paramIndex, jdbcType.getVendorTypeNumber());
192  99 paramIndex++;
193  99 continue;
194    }
195   
196  11933 stmt.setObject(paramIndex, value, jdbcType.getVendorTypeNumber());
197  11933 paramIndex++;
198    }
199    }
200    }
201   
 
202    class PluralBatchWorker implements BatchWorker {
203    private Logger logger = LoggerFactory.getLogger(PluralBatchWorker.class);
204   
205    private final PreparedStatement stmt;
206    private final Consumer<List<TupleAccessor>> generatedValuesConsumer;
207    private final GeneratedValueLoader generatedValueLoader;
208    private final int batchSize;
209    private int unExecutedNumberOfRows = 0;
210   
 
211  108 toggle PluralBatchWorker(
212    PreparedStatement stmt, String[] askedGeneratedColumns,
213    Consumer<List<TupleAccessor>> generatedValuesConsumer,
214    int batchSize
215    ) {
216  108 this.stmt = stmt;
217  108 this.generatedValueLoader = new GeneratedValueLoader(askedGeneratedColumns);
218  108 this.generatedValuesConsumer = generatedValuesConsumer;
219  108 this.batchSize = batchSize;
220    }
221   
 
222  1597 toggle @Override
223    public void addBatch(Map<ColumnMeta, Object> paramSet) throws SQLException
224    {
225  1597 BatchWorker.setParams(stmt, paramSet);
226   
227  1597 stmt.addBatch();
228  1597 unExecutedNumberOfRows++;
229   
230  1597 if (unExecutedNumberOfRows >= batchSize) {
231  28 executeBatch();
232    }
233    }
234   
 
235  136 toggle @Override
236    public void executeBatch() throws SQLException
237    {
238  136 if (unExecutedNumberOfRows == 0) {
239  6 return;
240    }
241   
242  130 logger.debug("Executing for [{}] statements of batch[{}]", unExecutedNumberOfRows, batchSize);
243   
244  130 stmt.executeBatch();
245  130 unExecutedNumberOfRows = 0;
246   
247  130 try (var rs = stmt.getGeneratedKeys()) {
248  130 generatedValuesConsumer.accept(
249    generatedValueLoader.toTuples(rs)
250    );
251    }
252    }
253    }
254   
 
255    class SingleBatchWorker implements BatchWorker {
256    private Logger logger = LoggerFactory.getLogger(SingleBatchWorker.class);
257   
258    private final PreparedStatement stmt;
259    private final Consumer<List<TupleAccessor>> generatedValuesConsumer;
260    private final GeneratedValueLoader generatedValueLoader;
261    private int counter = 0;
262   
 
263  0 toggle SingleBatchWorker(
264    PreparedStatement stmt, String[] askedGeneratedColumns,
265    Consumer<List<TupleAccessor>> generatedValuesConsumer
266    ) {
267  0 this.stmt = stmt;
268  0 this.generatedValueLoader = new GeneratedValueLoader(askedGeneratedColumns);
269  0 this.generatedValuesConsumer = generatedValuesConsumer;
270    }
271   
 
272  0 toggle @Override
273    public void addBatch(Map<ColumnMeta, Object> paramSet) throws SQLException
274    {
275  0 BatchWorker.setParams(stmt, paramSet);
276   
277  0 stmt.executeUpdate();
278  0 counter++;
279   
280  0 try (var rs = stmt.getGeneratedKeys()) {
281  0 generatedValuesConsumer.accept(
282    generatedValueLoader.toTuples(rs)
283    );
284    }
285    }
286   
 
287  0 toggle @Override
288    public void executeBatch() throws SQLException
289    {
290  0 logger.debug("Have executed [{}] statements individually", counter);
291  0 counter = 0;
292    }
293    }