Hi to all,
I was trying to port another job we have that use dataset API to datastream.
The legacy program was doing basically a dataset.mapPartition().reduce() so
I tried to replicate this thing with a
final BasicTypeInfo<Double> columnType = BasicTypeInfo.DOUBLE_TYPE_INFO;
final DataStream<Row> input = env.fromElements(//
Row.of(1.0), //
Row.of(2.0), //
Row.of(3.0), //
Row.of(5.0), //
Row.of(6.0)).returns(new RowTypeInfo(columnType));
inputStream.map(new SubtaskIndexAssigner(columnType))
.keyBy(t -> t.f0)
.window(GlobalWindows.create())
.trigger(PurgingTrigger.of(CountWithTimeoutTriggerPartition.of(Time.seconds(5),
100L))).
.process(..)
Unfortunately the program exits before reaching the Process function
(moreover I need to add another window + trigger after it before adding the
reduce function).
Is there a way to do this with the DataStream API or should I still use
DataSet API for the moment (when the batch will be fully supported)? I
append to the footer all the code required to test the job.
Best,
Flavio
-----------------------------------------------------------------
package org.apache.flink.stats.sketches;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.java.io.PrintingOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import
org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
public class Test {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
env.setParallelism(1);
final BasicTypeInfo<Double> columnType = BasicTypeInfo.DOUBLE_TYPE_INFO;
final DataStream<Row> input = env.fromElements(//
Row.of(1.0), //
Row.of(2.0), //
Row.of(3.0), //
Row.of(5.0), //
Row.of(6.0)).returns(new RowTypeInfo(columnType));
final DataStream<Row> out = input.map(new
SubtaskIndexAssigner(columnType))//
.keyBy(t -> t.f0)//
.window(GlobalWindows.create())
.trigger(PurgingTrigger.of(new
CountWithTimeoutTriggerPartition(Time.seconds(5), 100L)))
.process(new ProcessWindowFunction<Tuple2<Integer, Row>, Row,
Integer, GlobalWindow>() {
@Override
public void process(Integer key,
ProcessWindowFunction<Tuple2<Integer, Row>, Row, Integer,
GlobalWindow>.Context context,
Iterable<Tuple2<Integer, Row>> it, Collector<Row> out) throws
Exception {
for (Tuple2<Integer, Row> tuple : it) {
out.collect(Row.of(tuple.f1.getField(0).toString()));
}
}
}).returns(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO));
out.writeUsingOutputFormat(new PrintingOutputFormat<Row>());
env.execute();
}
private static final class SubtaskIndexAssigner extends
RichMapFunction<Row, Tuple2<Integer, Row>>
implements ResultTypeQueryable<Tuple2<Integer, Row>> {
private static final long serialVersionUID = 1L;
private int myTaskId;
private TypeInformation<?> columnType;
public SubtaskIndexAssigner(TypeInformation<?> columnType) {
this.columnType = columnType;
}
@Override
public void open(Configuration parameters) throws Exception {
this.myTaskId = getRuntimeContext().getIndexOfThisSubtask();
}
@Override
public Tuple2<Integer, Row> map(Row row) throws Exception {
return Tuple2.of(myTaskId, row);
}
@Override
public TypeInformation<Tuple2<Integer, Row>> getProducedType() {
return new TupleTypeInfo<Tuple2<Integer,
Row>>(BasicTypeInfo.INT_TYPE_INFO,
new RowTypeInfo(columnType));
}
}
private static class CountWithTimeoutTriggerPartition
extends Trigger<Tuple2<Integer, Row>, GlobalWindow> {
private static final long serialVersionUID = 1L;
private final long maxCount;
private final long maxTime;
private final ReducingStateDescriptor<Long> countstateDesc =
new ReducingStateDescriptor<>("count", new Sum(),
LongSerializer.INSTANCE);
private final ReducingStateDescriptor<Long> timestateDesc =
new ReducingStateDescriptor<>("fire-time", new Min(),
LongSerializer.INSTANCE);
public CountWithTimeoutTriggerPartition(long maxTime, long maxCount) {
this.maxCount = maxCount;
this.maxTime = maxTime;
}
public CountWithTimeoutTriggerPartition(Time maxTime, long maxCount) {
this(maxTime.toMilliseconds(), maxCount);
}
@Override
public TriggerResult onElement(Tuple2<Integer, Row> element, long
timestamp,
GlobalWindow window,
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
ctx)
throws Exception {
ReducingState<Long> fireTimestamp =
ctx.getPartitionedState(timestateDesc);
timestamp = ctx.getCurrentProcessingTime();
if (fireTimestamp.get() == null) {
long start = timestamp - (timestamp % maxTime);
long nextFireTimestamp = start + maxTime;
ctx.registerProcessingTimeTimer(nextFireTimestamp);
fireTimestamp.add(nextFireTimestamp);
return TriggerResult.CONTINUE;
}
ReducingState<Long> count = ctx.getPartitionedState(countstateDesc);
count.add(1L);
if (count.get() >= maxCount) {
count.clear();
fireTimestamp.clear();
return TriggerResult.FIRE_AND_PURGE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, GlobalWindow window,
TriggerContext ctx)
throws Exception {
ReducingState<Long> fireTimestamp =
ctx.getPartitionedState(timestateDesc);
ReducingState<Long> count = ctx.getPartitionedState(countstateDesc);
if (fireTimestamp.get().equals(time)) {
count.clear();
fireTimestamp.clear();
fireTimestamp.add(time + maxTime);
ctx.registerProcessingTimeTimer(time + maxTime);
return TriggerResult.FIRE_AND_PURGE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(@SuppressWarnings("unused") long time,
@SuppressWarnings("unused") GlobalWindow window,
@SuppressWarnings("unused") TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(GlobalWindow window, TriggerContext ctx) throws
Exception {
ReducingState<Long> fireTimestamp =
ctx.getPartitionedState(timestateDesc);
long timestamp = fireTimestamp.get();
ctx.deleteProcessingTimeTimer(timestamp);
fireTimestamp.clear();
ctx.getPartitionedState(countstateDesc).clear();
}
@Override
public boolean canMerge() {
return true;
}
@Override
public void onMerge(GlobalWindow window, OnMergeContext ctx) {
ctx.mergePartitionedState(countstateDesc);
ctx.mergePartitionedState(timestateDesc);
}
class Sum implements ReduceFunction<Long> {
private static final long serialVersionUID = 1L;
@Override
public Long reduce(Long value1, Long value2) throws Exception {
return value1 + value2;
}
}
class Min implements ReduceFunction<Long> {
private static final long serialVersionUID = 1L;
@Override
public Long reduce(Long value1, Long value2) throws Exception {
return Math.min(value1, value2);
}
}
}
}