Multiple Window Streams to same Kinesis Sink
I'm trying to sink two Window Streams to the same Kinesis Sink. When I do this, no results are making it to the sink (code below). If I remove one of the windows from the Job, results do get published. Adding another stream to the sink seems to void both. How can I have results from both Window Streams go to the same sink? Thanks public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); ObjectMapper jsonParser = new ObjectMapper(); DataStream inputStream = createKinesisSource(env); FlinkKinesisProducer kinesisSink = createKinesisSink(); WindowedStream oneMinStream = inputStream .map(value -> jsonParser.readValue(value, JsonNode.class)) .keyBy(node -> node.get("accountId")) .window(TumblingProcessingTimeWindows.of(Time.minutes(1))); oneMinStream .aggregate(new LoginAggregator("k1m")) .addSink(kinesisSink); WindowedStream twoMinStream = inputStream .map(value -> jsonParser.readValue(value, JsonNode.class)) .keyBy(node -> node.get("accountId")) .window(TumblingProcessingTimeWindows.of(Time.minutes(2))); twoMinStream .aggregate(new LoginAggregator("k2m")) .addSink(kinesisSink); try { env.execute("Flink Kinesis Streaming Sink Job"); } catch (Exception e) { LOG.error("failed"); LOG.error(e.getLocalizedMessage()); LOG.error(e.getStackTrace().toString()); throw e; } } private static DataStream createKinesisSource(StreamExecutionEnvironment env) { Properties inputProperties = new Properties(); inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties)); } private static FlinkKinesisProducer createKinesisSink() { Properties outputProperties = new Properties(); outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); outputProperties.setProperty("AggregationEnabled", "false"); FlinkKinesisProducer sink = new FlinkKinesisProducer<>(new SimpleStringSchema(), outputProperties); sink.setDefaultStream(outputStreamName); sink.setDefaultPartition(UUID.randomUUID().toString()); return sink; }
Re: "An illegal reflective access operation has occurred" during KeyedStream process
I changed the ".map(...)" and ".print()" terminal statement to : .executeAndCollect() .forEachRemaining(System.out::println); The warnings were replaced with: WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by com.twitter.chill.java.ArraysAsListSerializer (file:/Users/cjensen/data/tools/apache-maven-3.3.9/m2/com/twitter/chill-java/0.7.6/chill-java-0.7.6.jar) to field java.util.Arrays$ArrayList.a WARNING: Please consider reporting this to the maintainers of com.twitter.chill.java.ArraysAsListSerializer WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release On Tue, Nov 29, 2022 at 3:25 PM Curtis Jensen wrote: > > Hello, > > Using Flink version 1.15.0, I recieve these warnings when trying a > small example (code below): > WARNING: An illegal reflective access operation has occurred > WARNING: Illegal reflective access by > org.apache.flink.api.java.ClosureCleaner > (file:/Users/cjensen/data/tools/apache-maven-3.3.9/m2/org/apache/flink/flink-core/1.15.0/flink-core-1.15.0.jar) > to field java.lang.String.value > WARNING: Please consider reporting this to the maintainers of > org.apache.flink.api.java.ClosureCleaner > WARNING: Use --illegal-access=warn to enable warnings of further > illegal reflective access operations > WARNING: All illegal access operations will be denied in a future release > > I am undoubtedly doing something incorrectly, but felt that it may be > useful to take the advice "Please consider reporting this to the > maintainers of org.apache.flink.api.java.ClosureCleaner". > Also, any corrections to my example would be appreciated. > > Thanks, > Curtis > > > > > AvgAmount.java > > import org.apache.flink.api.common.RuntimeExecutionMode; > import org.apache.flink.api.common.state.ValueState; > import org.apache.flink.api.common.state.ValueStateDescriptor; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.streaming.api.datastream.DataStream; > import org.apache.flink.streaming.api.datastream.KeyedStream; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.streaming.api.functions.KeyedProcessFunction; > import org.apache.flink.util.Collector; > > public class AvgAmount { > > public static void main(String[] args) { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setRuntimeMode(RuntimeExecutionMode.BATCH); > > DataStream purchaseStream = > env.fromElements(ExampleData.PURCHASE_EVENTS); > KeyedStream keyedPurchaseStream = purchaseStream.keyBy(event -> > event.account_id); > keyedPurchaseStream.process(new PurchaseEventProcessor()) > .map(stats -> stats.toString()) > .print(); > } > > public static class PurchaseStats { > public String accountId; > public long amountSum; > public long amountCount; > > public PurchaseStats(String accountId) { > this.accountId = accountId; > } > > public void addAmount(long amount) { > amountSum += amount; > amountCount += 1; > } > > @Override > public String toString() { > return String.format("{\"account_id\":\"%s\",\"avg_amount\":%f}", > accountId, (double)amountSum/(double)amountCount); > } > } > > public static class PurchaseEventProcessor extends > KeyedProcessFunction > { > ValueState seen; > > @Override > public void open(Configuration parameters) { > seen = getRuntimeContext().getState(new > ValueStateDescriptor<>("seen", PurchaseStats.class)); > } > > @Override > public void processElement(ExampleData.PurchaseEvent > purchaseEvent, KeyedProcessFunction PurchaseStats>.Context context, Collector out) throws > Exception { > PurchaseStats currentStats = seen.value(); > if (currentStats == null) { > currentStats = new PurchaseStats(purchaseEvent.account_id); > } > > currentStats.addAmount(purchaseEvent.amount); > > seen.update(currentStats); > out.collect(currentStats); > } > } > } > > ExampleData.java > > import org.apache.flink.types.Row; > import org.apache.flink.types.RowKind; > > import java.time.Instant; > > public class ExampleData { > public static final PurchaseEvent[] PURCHASE_EVENTS = > new PurchaseEvent[] { > new PurchaseEvent("1337Gamer", "192.168.0.1&q
"An illegal reflective access operation has occurred" during KeyedStream process
Hello, Using Flink version 1.15.0, I recieve these warnings when trying a small example (code below): WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/Users/cjensen/data/tools/apache-maven-3.3.9/m2/org/apache/flink/flink-core/1.15.0/flink-core-1.15.0.jar) to field java.lang.String.value WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release I am undoubtedly doing something incorrectly, but felt that it may be useful to take the advice "Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner". Also, any corrections to my example would be appreciated. Thanks, Curtis AvgAmount.java import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.util.Collector; public class AvgAmount { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.BATCH); DataStream purchaseStream = env.fromElements(ExampleData.PURCHASE_EVENTS); KeyedStream keyedPurchaseStream = purchaseStream.keyBy(event -> event.account_id); keyedPurchaseStream.process(new PurchaseEventProcessor()) .map(stats -> stats.toString()) .print(); } public static class PurchaseStats { public String accountId; public long amountSum; public long amountCount; public PurchaseStats(String accountId) { this.accountId = accountId; } public void addAmount(long amount) { amountSum += amount; amountCount += 1; } @Override public String toString() { return String.format("{\"account_id\":\"%s\",\"avg_amount\":%f}", accountId, (double)amountSum/(double)amountCount); } } public static class PurchaseEventProcessor extends KeyedProcessFunction { ValueState seen; @Override public void open(Configuration parameters) { seen = getRuntimeContext().getState(new ValueStateDescriptor<>("seen", PurchaseStats.class)); } @Override public void processElement(ExampleData.PurchaseEvent purchaseEvent, KeyedProcessFunction.Context context, Collector out) throws Exception { PurchaseStats currentStats = seen.value(); if (currentStats == null) { currentStats = new PurchaseStats(purchaseEvent.account_id); } currentStats.addAmount(purchaseEvent.amount); seen.update(currentStats); out.collect(currentStats); } } } ExampleData.java import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; import java.time.Instant; public class ExampleData { public static final PurchaseEvent[] PURCHASE_EVENTS = new PurchaseEvent[] { new PurchaseEvent("1337Gamer", "192.168.0.1", 1000), new PurchaseEvent("1337", "127.0.0.1", 1000), new PurchaseEvent("1337", "127.0.0.2", 100), new PurchaseEvent("1337", "127.0.0.1", 9900) }; public static class PurchaseEvent { public long timestamp; public String account_id; public String ip; public long amount; public PurchaseEvent() { } public PurchaseEvent(String accountId, String ip, long amount) { this(Instant.now().getEpochSecond(), accountId, ip, amount); } public PurchaseEvent(long timestamp, String accountId, String ip, long amount) { this.timestamp = timestamp; this.account_id = accountId; this.ip = ip; this.amount = amount; } } }