aromanenko-dev commented on a change in pull request #15049:
URL: https://github.com/apache/beam/pull/15049#discussion_r659784617
##########
File path:
sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcUtil.java
##########
@@ -252,4 +262,67 @@ private static Calendar withTimestampAndTimezone(DateTime
dateTime) {
return calendar;
}
+
+ /** Create partitions on a table. */
+ static class PartitioningFn extends DoFn<List<Integer>, KV<String, Integer>>
{
+ @ProcessElement
+ public void processElement(ProcessContext c) {
+ List<Integer> params = c.element();
+ Integer lowerBound = params.get(0);
+ Integer upperBound = params.get(1);
+ Integer numPartitions = params.get(2);
+ int stride = (upperBound - lowerBound) / numPartitions + 1;
+ for (int i = lowerBound; i < upperBound - stride; i += stride) {
+ String range = String.format("%s,%s", i, i + stride);
+ KV<String, Integer> kvRange = KV.of(range, 1);
+ c.output(kvRange);
+ }
+ if (upperBound - lowerBound > stride * (numPartitions - 1)) {
+ int indexFrom = (numPartitions - 1) * stride;
+ int indexTo = upperBound + 1;
+ String range = String.format("%s,%s", indexFrom, indexTo);
+ KV<String, Integer> kvRange = KV.of(range, 1);
+ c.output(kvRange);
+ }
+ }
+ }
+
+ /**
+ * Select maximal and minimal value from a table by partitioning column.
+ *
+ * @return pair of integers corresponds to the upper and lower bounds.
+ */
+ static Integer[] getBounds(
+ PBegin input,
+ String table,
+ SerializableFunction<Void, DataSource> providerFunctionFn,
+ String partitionColumn) {
+ final Integer[] bounds = {0, 0};
+ input
+ .apply(
+ String.format("Read min and max value by %s", partitionColumn),
+ JdbcIO.<String>read()
+ .withDataSourceProviderFn(providerFunctionFn)
+ .withQuery(
+ String.format("select min(%1$s), max(%1$s) from %2$s",
partitionColumn, table))
+ .withRowMapper(
+ (JdbcIO.RowMapper<String>)
+ resultSet ->
+ String.join(
+ ",", Arrays.asList(resultSet.getString(1),
resultSet.getString(2))))
+ .withOutputParallelization(false)
+ .withCoder(StringUtf8Coder.of()))
+ .apply(
+ ParDo.of(
+ new DoFn<String, String>() {
+ @ProcessElement
+ public void processElement(ProcessContext context) {
+ List<String> elements =
Splitter.on(',').splitToList(context.element());
+ bounds[0] =
Integer.parseInt(Objects.requireNonNull(elements.get(0)));
+ bounds[1] =
Integer.parseInt(Objects.requireNonNull(elements.get(1)));
+ context.output(context.element());
+ }
+ }));
+ return bounds;
Review comment:
Ping on this
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]