退订

2021-10-29 Thread Jensen
退订

Re:Re: Schema Evolution & Json Schemas

2024-03-10 Thread Jensen
退订
















At 2024-02-26 20:55:19, "Salva Alcántara"  wrote:

Awesome Andrew, thanks a lot for the info!


On Sun, Feb 25, 2024 at 4:37 PM Andrew Otto  wrote:

>  the following code generator
Oh, and FWIW we avoid code generation and POJOs, and instead rely on Flink's 
Row or RowData abstractions.










On Sun, Feb 25, 2024 at 10:35 AM Andrew Otto  wrote:

Hi! 


I'm not sure if this totally is relevant for you, but we use JSONSchema and 
JSON with Flink at the Wikimedia Foundation. 
We explicitly disallow the use of additionalProperties, unless it is to define 
Map type fields (where additionalProperties itself is a schema).


We have JSONSchema converters and JSON Serdes to be able to use our JSONSchemas 
and JSON records with both the DataStream API (as Row) and Table API (as 
RowData).


See:
- 
https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/src/main/java/org/wikimedia/eventutilities/flink/formats/json
- 
https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/#managing-a-object


State schema evolution is supported via the EventRowTypeInfo wrapper.


Less directly about Flink: I gave a talk at Confluent's Current conf in 2022 
about why we use JSONSchema. See also this blog post series if you are 
interested!


-Andrew Otto
 Wikimedia Foundation




On Fri, Feb 23, 2024 at 1:58 AM Salva Alcántara  wrote:

I'm facing some issues related to schema evolution in combination with the 
usage of Json Schemas and I was just wondering whether there are any 
recommended best practices.


In particular, I'm using the following code generator:


- https://github.com/joelittlejohn/jsonschema2pojo



Main gotchas so far relate to the `additionalProperties` field. When setting 
that to true, the resulting POJO is not valid according to Flink rules because 
the generated getter/setter methods don't follow the java beans naming 
conventions, e.g., see here:


- https://github.com/joelittlejohn/jsonschema2pojo/issues/1589


This means that the Kryo fallback is used for serialization purposes, which is 
not only bad for performance but also breaks state schema evolution.


So, because of that, setting `additionalProperties` to `false` looks like a 
good idea but then your job will break if an upstream/producer service adds a 
property to the messages you are reading. To solve this problem, the POJOs for 
your job (as a reader) can be generated to ignore the `additionalProperties` 
field (via the `@JsonIgnore` Jackson annotation). This seems to be a good 
overall solution to the problem, but looks a bit convoluted to me / didn't come 
without some trial & error (= pain & frustration).


Is there anyone here facing similar issues? It would be good to hear your 
thoughts on this!


BTW, this is very interesting article that touches on the above mentioned 
difficulties:
- 
https://www.creekservice.org/articles/2024/01/09/json-schema-evolution-part-2.html
 



"An illegal reflective access operation has occurred" during KeyedStream process

2022-11-29 Thread Curtis Jensen
Hello,

Using Flink version 1.15.0, I recieve these warnings when trying a
small example (code below):
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by
org.apache.flink.api.java.ClosureCleaner
(file:/Users/cjensen/data/tools/apache-maven-3.3.9/m2/org/apache/flink/flink-core/1.15.0/flink-core-1.15.0.jar)
to field java.lang.String.value
WARNING: Please consider reporting this to the maintainers of
org.apache.flink.api.java.ClosureCleaner
WARNING: Use --illegal-access=warn to enable warnings of further
illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release

I am undoubtedly doing something incorrectly, but felt that it may be
useful to take the advice "Please consider reporting this to the
maintainers of org.apache.flink.api.java.ClosureCleaner".
Also, any corrections to my example would be appreciated.

Thanks,
Curtis




AvgAmount.java

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

public class AvgAmount {

  public static void main(String[] args) {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);

DataStream purchaseStream =
env.fromElements(ExampleData.PURCHASE_EVENTS);
KeyedStream keyedPurchaseStream = purchaseStream.keyBy(event ->
event.account_id);
keyedPurchaseStream.process(new PurchaseEventProcessor())
.map(stats -> stats.toString())
.print();
  }

  public static class PurchaseStats {
public String accountId;
public long amountSum;
public long amountCount;

public PurchaseStats(String accountId) {
  this.accountId = accountId;
}

public void addAmount(long amount) {
  amountSum += amount;
  amountCount += 1;
}

@Override
public String toString() {
  return String.format("{\"account_id\":\"%s\",\"avg_amount\":%f}",
accountId, (double)amountSum/(double)amountCount);
}
  }

  public static class PurchaseEventProcessor extends
KeyedProcessFunction
{
ValueState seen;

@Override
public void open(Configuration parameters) {
  seen = getRuntimeContext().getState(new
ValueStateDescriptor<>("seen", PurchaseStats.class));
}

@Override
public void processElement(ExampleData.PurchaseEvent
purchaseEvent, KeyedProcessFunction.Context context, Collector out) throws
Exception {
  PurchaseStats currentStats = seen.value();
  if (currentStats == null) {
currentStats = new PurchaseStats(purchaseEvent.account_id);
  }

  currentStats.addAmount(purchaseEvent.amount);

  seen.update(currentStats);
  out.collect(currentStats);
}
  }
}

ExampleData.java

import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;

import java.time.Instant;

public class ExampleData {
public static final PurchaseEvent[] PURCHASE_EVENTS =
new PurchaseEvent[] {
new PurchaseEvent("1337Gamer", "192.168.0.1", 1000),
new PurchaseEvent("1337", "127.0.0.1", 1000),
new PurchaseEvent("1337", "127.0.0.2", 100),
new PurchaseEvent("1337", "127.0.0.1", 9900)
};

public static class PurchaseEvent {
public long timestamp;
public String account_id;
public String ip;
public long amount;

public PurchaseEvent() { }

public PurchaseEvent(String accountId, String ip, long amount) {
this(Instant.now().getEpochSecond(), accountId, ip, amount);
}

public PurchaseEvent(long timestamp, String accountId, String
ip, long amount) {
this.timestamp = timestamp;
this.account_id = accountId;
this.ip = ip;
this.amount = amount;
}
}
}


Re: "An illegal reflective access operation has occurred" during KeyedStream process

2022-11-29 Thread Curtis Jensen
I changed the ".map(...)" and ".print()" terminal statement to :
.executeAndCollect()
.forEachRemaining(System.out::println);

The warnings were replaced with:
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by
com.twitter.chill.java.ArraysAsListSerializer
(file:/Users/cjensen/data/tools/apache-maven-3.3.9/m2/com/twitter/chill-java/0.7.6/chill-java-0.7.6.jar)
to field java.util.Arrays$ArrayList.a
WARNING: Please consider reporting this to the maintainers of
com.twitter.chill.java.ArraysAsListSerializer
WARNING: Use --illegal-access=warn to enable warnings of further
illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release

On Tue, Nov 29, 2022 at 3:25 PM Curtis Jensen  wrote:
>
> Hello,
>
> Using Flink version 1.15.0, I recieve these warnings when trying a
> small example (code below):
> WARNING: An illegal reflective access operation has occurred
> WARNING: Illegal reflective access by
> org.apache.flink.api.java.ClosureCleaner
> (file:/Users/cjensen/data/tools/apache-maven-3.3.9/m2/org/apache/flink/flink-core/1.15.0/flink-core-1.15.0.jar)
> to field java.lang.String.value
> WARNING: Please consider reporting this to the maintainers of
> org.apache.flink.api.java.ClosureCleaner
> WARNING: Use --illegal-access=warn to enable warnings of further
> illegal reflective access operations
> WARNING: All illegal access operations will be denied in a future release
>
> I am undoubtedly doing something incorrectly, but felt that it may be
> useful to take the advice "Please consider reporting this to the
> maintainers of org.apache.flink.api.java.ClosureCleaner".
> Also, any corrections to my example would be appreciated.
>
> Thanks,
> Curtis
>
>
>
>
> AvgAmount.java
>
> import org.apache.flink.api.common.RuntimeExecutionMode;
> import org.apache.flink.api.common.state.ValueState;
> import org.apache.flink.api.common.state.ValueStateDescriptor;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.datastream.KeyedStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
> import org.apache.flink.util.Collector;
>
> public class AvgAmount {
>
>   public static void main(String[] args) {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setRuntimeMode(RuntimeExecutionMode.BATCH);
>
> DataStream purchaseStream =
> env.fromElements(ExampleData.PURCHASE_EVENTS);
> KeyedStream keyedPurchaseStream = purchaseStream.keyBy(event ->
> event.account_id);
> keyedPurchaseStream.process(new PurchaseEventProcessor())
> .map(stats -> stats.toString())
> .print();
>   }
>
>   public static class PurchaseStats {
> public String accountId;
> public long amountSum;
> public long amountCount;
>
> public PurchaseStats(String accountId) {
>   this.accountId = accountId;
> }
>
> public void addAmount(long amount) {
>   amountSum += amount;
>   amountCount += 1;
> }
>
> @Override
> public String toString() {
>   return String.format("{\"account_id\":\"%s\",\"avg_amount\":%f}",
> accountId, (double)amountSum/(double)amountCount);
> }
>   }
>
>   public static class PurchaseEventProcessor extends
> KeyedProcessFunction
> {
> ValueState seen;
>
> @Override
> public void open(Configuration parameters) {
>   seen = getRuntimeContext().getState(new
> ValueStateDescriptor<>("seen", PurchaseStats.class));
> }
>
> @Override
> public void processElement(ExampleData.PurchaseEvent
> purchaseEvent, KeyedProcessFunction PurchaseStats>.Context context, Collector out) throws
> Exception {
>   PurchaseStats currentStats = seen.value();
>   if (currentStats == null) {
> currentStats = new PurchaseStats(purchaseEvent.account_id);
>   }
>
>   currentStats.addAmount(purchaseEvent.amount);
>
>   seen.update(currentStats);
>   out.collect(currentStats);
> }
>   }
> }
>
> ExampleData.java
>
> import org.apache.flink.types.Row;
> import org.apache.flink.types.RowKind;
>
> import java.time.Instant;
>
> public class ExampleData {
> public static final PurchaseEvent[] PURCHASE_EVENTS =
> new PurchaseEvent[] {
> new PurchaseEvent("1337Gamer", "192.168.0.1&q

Multiple Window Streams to same Kinesis Sink

2023-01-27 Thread Curtis Jensen
I'm trying to sink two Window Streams to the same Kinesis Sink.  When
I do this, no results are making it to the sink (code below).  If I
remove one of the windows from the Job, results do get published.
Adding another stream to the sink seems to void both.

How can I have results from both Window Streams go to the same sink?

Thanks


public static void main(String[] args) throws Exception {

final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

ObjectMapper jsonParser = new ObjectMapper();

DataStream inputStream = createKinesisSource(env);
FlinkKinesisProducer kinesisSink = createKinesisSink();

WindowedStream oneMinStream = inputStream
.map(value -> jsonParser.readValue(value, JsonNode.class))
.keyBy(node -> node.get("accountId"))
.window(TumblingProcessingTimeWindows.of(Time.minutes(1)));

oneMinStream
.aggregate(new LoginAggregator("k1m"))
.addSink(kinesisSink);

WindowedStream twoMinStream = inputStream
.map(value -> jsonParser.readValue(value, JsonNode.class))
.keyBy(node -> node.get("accountId"))
.window(TumblingProcessingTimeWindows.of(Time.minutes(2)));

twoMinStream
.aggregate(new LoginAggregator("k2m"))
.addSink(kinesisSink);

try {
env.execute("Flink Kinesis Streaming Sink Job");
} catch (Exception e) {
LOG.error("failed");
LOG.error(e.getLocalizedMessage());
LOG.error(e.getStackTrace().toString());

throw e;
}
}


private static DataStream
createKinesisSource(StreamExecutionEnvironment env) {
Properties inputProperties = new Properties();
inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
"LATEST");
return env.addSource(new FlinkKinesisConsumer<>(inputStreamName,
new SimpleStringSchema(), inputProperties));
}

private static FlinkKinesisProducer createKinesisSink() {
Properties outputProperties = new Properties();
outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
outputProperties.setProperty("AggregationEnabled", "false");

FlinkKinesisProducer sink = new FlinkKinesisProducer<>(new
SimpleStringSchema(), outputProperties);
sink.setDefaultStream(outputStreamName);
sink.setDefaultPartition(UUID.randomUUID().toString());

return sink;
}