I need to compute averages on time series data upon a 15 minute tumbling event time window that is backfilled.
The time series data is a Tuple3 of name: String, value: double, event_time: Timestamp (Instant). I need to compute the average value of the name time series on a tumbling window of 15 minutes with backfills such that given the input: a,10,2020-06-23T00:01:30.0000000Z a,15,2020-06-23T00:02:30.0000000Z a,20,2020-06-23T00:03:30.0000000Z b,25,2020-06-23T00:03:30.0000000Z b,30,2020-06-23T00:02:30.0000000Z b,35,2020-06-23T00:01:30.0000000Z b,35,2020-06-23T00:16:30.0000000Z it yields the following averages with backfill: a,15,2020-06-23 00:00:00.0 b,30,2020-06-23 00:00:00.0 a,15,2020-06-23 00:15:00.0 b,35,2020-06-23 00:15:00.0 Notice that although no value arrived "a" in the second quarter, the previous average was upserted. I only got as far as computing the average, but I have not figured a recommended strategy for upserting the backfill. I made a GitHub project to share my approach: https://github.com/minmay/flink-patterns And the following code demonstrates my approach thus far. Can somebody please provide me guidance on what is the "Flink" recommended way of assigning a backfill to an average on keyed windowed stream? package mvillalobos.flink.patterns.timeseries.average; import com.google.common.collect.ImmutableList; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.io.jdbc.JDBCOptions; import org.apache.flink.api.java.io.jdbc.JDBCUpsertTableSink; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple7; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableSchema; import org.apache.flink.types.Row; import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.Statement; import java.sql.Timestamp; import java.time.Instant; import java.util.Arrays; public class TimeSeriesAverageApp { private final static Logger logger = LoggerFactory.getLogger(TimeSeriesAverageApp.class); public void stream(String inputFilePath) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // GIVEN a SOURCE with a CSV input file // in which each line has a: String, double, Instant // THEN the MAP operator // transforms the line into a Tuple7 // f0: name: String // f1: window_size: int // f2: value: double // f3: event_timestamp: Instant // f4: aggregate_sum: double // f5: aggregate_count double // f6: is_backfile: boolean // WHEN the map operation finishes // THEN the event time assigned using field f3 final DataStream<Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>> timeSeriesStream = env.readTextFile(inputFilePath) .map(line -> { final String[] split = line.split(","); final String name = split[0]; final double value = Double.parseDouble(split[1]); final Instant timestamp = Instant.parse(split[2]); return Tuple7.of(name, 1, value, timestamp, value, 1, false); }).returns(Types.TUPLE(Types.STRING, Types.INT, Types.DOUBLE, TypeInformation.of(Instant.class), Types.DOUBLE, Types.INT, Types.BOOLEAN)) .assignTimestampsAndWatermarks( new AscendingTimestampExtractor<>() { @Override public long extractAscendingTimestamp(Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean> element) { return element.f3.toEpochMilli(); } } ); final JDBCUpsertTableSink jdbcUpsertTableSink = buildJdbcUpsertTableSink(); upsertToJDBC(jdbcUpsertTableSink, timeSeriesStream); // GIVEN a data stream with Tuple7 // f0: name: String // f1: window_size: int // f2: value: double // f3: event_timestamp: Instant // f4: aggregate_sum: double // f5: aggregate_count double // f6: is_backfile: boolean // THEN the stream is KEYED BY: f0: name:String, f1: window_size: int // THEN the stream is WINDOWED into a tumbling event time window of 15 minutes // THEN the window is configured to allow elements late by 1 hour // THEN a low-level process function is applied to the window that // aggregates the time series by assigning the following tuple fields: // f1: window_size = 15 minutes in miliseconds // f2: value = average value in this 15 minute window // f3: event_timestamp = the first epoch millisecond in this 15 minute window // f4: aggregate_sum = sum of f2 values in this 15 minute window // f5: aggregate_count = number of values in this 15 minute window final SingleOutputStreamOperator<Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>> aggregateTimeSeriesStream = timeSeriesStream.keyBy(0, 1) .window(TumblingEventTimeWindows.of(Time.minutes(15))) .allowedLateness(Time.hours(1)) .process(new ProcessWindowFunction<Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>, Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>, Tuple, TimeWindow>() { @Override public void process( Tuple tuple, Context context, Iterable<Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>> elements, Collector<Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>> out ) throws Exception { final Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean> aggregation = new Tuple7<>(); boolean is_window_initialized = false; for (Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean> element : ImmutableList.copyOf(elements).reverse()) { if (!is_window_initialized) { final Instant timestamp = Instant.ofEpochMilli(context.window().getStart()); aggregation.f0 = element.f0; aggregation.f1 = (int) Time.minutes(15).toMilliseconds(); aggregation.f2 = element.f2; aggregation.f3 = timestamp; aggregation.f4 = 0D; aggregation.f5 = 0; aggregation.f6 = false; is_window_initialized = true; } aggregation.f4 += element.f2; aggregation.f5++; aggregation.f2 = aggregation.f4 / aggregation.f5; } out.collect(aggregation); } }); upsertToJDBC(jdbcUpsertTableSink, aggregateTimeSeriesStream); env.execute("time series"); } private JDBCUpsertTableSink buildJdbcUpsertTableSink() { final JDBCUpsertTableSink jdbcUpsertTableSink = JDBCUpsertTableSink.builder() .setOptions(JDBCOptions.builder() .setDBUrl("jdbc:derby:memory:flink") .setTableName("time_series") .build()) .setTableSchema(TableSchema.builder() .field("name", DataTypes.VARCHAR(50).notNull()) .field("window_size", DataTypes.INT().notNull()) .field("value", DataTypes.DOUBLE().notNull()) .field("event_timestamp", DataTypes.TIMESTAMP().notNull()) .field("aggregate_sum", DataTypes.DOUBLE().notNull()) .field("aggregate_count", DataTypes.INT().notNull()) .field("is_backfill", DataTypes.BOOLEAN().notNull()) .primaryKey("name", "window_size", "event_timestamp") .build()) .build(); jdbcUpsertTableSink.setKeyFields(new String[]{"name", "window_size", "event_timestamp"}); return jdbcUpsertTableSink; } private void upsertToJDBC(JDBCUpsertTableSink jdbcUpsertTableSink, DataStream<Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>> timeSeriesStream) { jdbcUpsertTableSink.consumeDataStream(timeSeriesStream.map(t -> { final Row row = new Row(7); row.setField(0, t.f0); row.setField(1, t.f1); row.setField(2, t.f2); row.setField(3, Timestamp.from(t.f3)); row.setField(4, t.f4); row.setField(5, t.f5); row.setField(6, t.f6); return new Tuple2<>(true, row); }).returns(new TypeHint<Tuple2<Boolean, Row>>() { })); } public static void main(String[] args) throws Exception { logger.info("Command line arguments: {}", Arrays.toString(args)); final String inputFilePath = args[0]; logger.info("Reading input file: {}", inputFilePath); final String databaseURL = "jdbc:derby:memory:flink;create=true"; try (final Connection con = DriverManager.getConnection(databaseURL)) { try (final Statement stmt = con.createStatement();) { stmt.execute("CREATE TABLE time_series (\n" + " id INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1),\n" + " name VARCHAR(50) NOT NULL,\n" + " window_size INTEGER NOT NULL DEFAULT 1,\n" + " event_timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,\n" + " value DOUBLE PRECISION NOT NULL DEFAULT 0,\n" + " aggregate_sum DOUBLE PRECISION NOT NULL DEFAULT 0,\n" + " aggregate_count INTEGER NOT NULL DEFAULT 1,\n" + " is_backfill BOOLEAN NOT NULL DEFAULT false,\n" + " version INTEGER NOT NULL DEFAULT 1,\n" + " create_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,\n" + " modify_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,\n" + " UNIQUE (name, window_size, event_timestamp)\n" + ")"); } TimeSeriesAverageApp app = new TimeSeriesAverageApp(); app.stream(inputFilePath); try (final Statement stmt = con.createStatement()) { final ResultSet rs = stmt.executeQuery("SELECT id, name, window_size, event_timestamp, value, aggregate_sum, aggregate_count, is_backfill, version, create_time, modify_time FROM time_series"); while (rs.next()) { final long id = rs.getLong(1); final String name = rs.getString(2); final int window_size = rs.getInt(3); final Timestamp event_timestamp = rs.getTimestamp(4); final double value = rs.getDouble(5); final double aggregate_sum = rs.getDouble(6); final int aggregate_count = rs.getInt(7); final boolean is_backfill = rs.getBoolean(8); final int version = rs.getInt(9); final Timestamp create_time = rs.getTimestamp(10); final Timestamp modify_time = rs.getTimestamp(11); logger.info( "id: {}, name: \"{}\", window_size: {}, event_timestamp: \"{}\", value: {}, aggregate_sum: {}, aggregate_count: {}, is_backfill: {} version: {} create_time: \"{}\" modify_time: \"{}\"", id, name, window_size, event_timestamp, value, aggregate_sum, aggregate_count, is_backfill, version, create_time, modify_time ); } } } } }