I changed the ".map(...)" and ".print()" terminal statement to :
.executeAndCollect()
.forEachRemaining(System.out::println);

The warnings were replaced with:
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by
com.twitter.chill.java.ArraysAsListSerializer
(file:/Users/cjensen/data/tools/apache-maven-3.3.9/m2/com/twitter/chill-java/0.7.6/chill-java-0.7.6.jar)
to field java.util.Arrays$ArrayList.a
WARNING: Please consider reporting this to the maintainers of
com.twitter.chill.java.ArraysAsListSerializer
WARNING: Use --illegal-access=warn to enable warnings of further
illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release

On Tue, Nov 29, 2022 at 3:25 PM Curtis Jensen <curtis.jen...@gmail.com> wrote:
>
> Hello,
>
> Using Flink version 1.15.0, I recieve these warnings when trying a
> small example (code below):
> WARNING: An illegal reflective access operation has occurred
> WARNING: Illegal reflective access by
> org.apache.flink.api.java.ClosureCleaner
> (file:/Users/cjensen/data/tools/apache-maven-3.3.9/m2/org/apache/flink/flink-core/1.15.0/flink-core-1.15.0.jar)
> to field java.lang.String.value
> WARNING: Please consider reporting this to the maintainers of
> org.apache.flink.api.java.ClosureCleaner
> WARNING: Use --illegal-access=warn to enable warnings of further
> illegal reflective access operations
> WARNING: All illegal access operations will be denied in a future release
>
> I am undoubtedly doing something incorrectly, but felt that it may be
> useful to take the advice "Please consider reporting this to the
> maintainers of org.apache.flink.api.java.ClosureCleaner".
> Also, any corrections to my example would be appreciated.
>
> Thanks,
> Curtis
>
>
>
>
> AvgAmount.java
>
> import org.apache.flink.api.common.RuntimeExecutionMode;
> import org.apache.flink.api.common.state.ValueState;
> import org.apache.flink.api.common.state.ValueStateDescriptor;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.datastream.KeyedStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
> import org.apache.flink.util.Collector;
>
> public class AvgAmount {
>
>   public static void main(String[] args) {
>     StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>     env.setRuntimeMode(RuntimeExecutionMode.BATCH);
>
>     DataStream<ExampleData.PurchaseEvent> purchaseStream =
> env.fromElements(ExampleData.PURCHASE_EVENTS);
>     KeyedStream keyedPurchaseStream = purchaseStream.keyBy(event ->
> event.account_id);
>     keyedPurchaseStream.process(new PurchaseEventProcessor())
>             .map(stats -> stats.toString())
>                     .print();
>   }
>
>   public static class PurchaseStats {
>     public String accountId;
>     public long amountSum;
>     public long amountCount;
>
>     public PurchaseStats(String accountId) {
>       this.accountId = accountId;
>     }
>
>     public void addAmount(long amount) {
>       amountSum += amount;
>       amountCount += 1;
>     }
>
>     @Override
>     public String toString() {
>       return String.format("{\"account_id\":\"%s\",\"avg_amount\":%f}",
> accountId, (double)amountSum/(double)amountCount);
>     }
>   }
>
>   public static class PurchaseEventProcessor extends
> KeyedProcessFunction<String, ExampleData.PurchaseEvent, PurchaseStats>
> {
>     ValueState<PurchaseStats> seen;
>
>     @Override
>     public void open(Configuration parameters) {
>       seen = getRuntimeContext().getState(new
> ValueStateDescriptor<>("seen", PurchaseStats.class));
>     }
>
>     @Override
>     public void processElement(ExampleData.PurchaseEvent
> purchaseEvent, KeyedProcessFunction<String, ExampleData.PurchaseEvent,
> PurchaseStats>.Context context, Collector<PurchaseStats> out) throws
> Exception {
>       PurchaseStats currentStats = seen.value();
>       if (currentStats == null) {
>         currentStats = new PurchaseStats(purchaseEvent.account_id);
>       }
>
>       currentStats.addAmount(purchaseEvent.amount);
>
>       seen.update(currentStats);
>       out.collect(currentStats);
>     }
>   }
> }
>
> ExampleData.java
>
> import org.apache.flink.types.Row;
> import org.apache.flink.types.RowKind;
>
> import java.time.Instant;
>
> public class ExampleData {
>     public static final PurchaseEvent[] PURCHASE_EVENTS =
>             new PurchaseEvent[] {
>                     new PurchaseEvent("1337Gamer", "192.168.0.1", 1000),
>                     new PurchaseEvent("1337", "127.0.0.1", 1000),
>                     new PurchaseEvent("1337", "127.0.0.2", 100),
>                     new PurchaseEvent("1337", "127.0.0.1", 9900)
>             };
>
>     public static class PurchaseEvent {
>         public long timestamp;
>         public String account_id;
>         public String ip;
>         public long amount;
>
>         public PurchaseEvent() { }
>
>         public PurchaseEvent(String accountId, String ip, long amount) {
>             this(Instant.now().getEpochSecond(), accountId, ip, amount);
>         }
>
>         public PurchaseEvent(long timestamp, String accountId, String
> ip, long amount) {
>             this.timestamp = timestamp;
>             this.account_id = accountId;
>             this.ip = ip;
>             this.amount = amount;
>         }
>     }
> }

Reply via email to