Thanks again Timo, I hope I replied correctly this time.

As per my previous message the Sensor class is a very simple POJO type (I
think).

When the serialization trace talks about PGSql stuff it makes me think that
something from my operator is being included in serialization. Not just the
Sensor object itself which I am explicitly including in state.

package sensingfeeling.functions.mapping;

public final class ArbJoinFunction extends RichJoinFunction<TypeB, TypeC>,
TypeA> {

private static final long serialVersionUID = 8582433437601788991L;

private transient ValueState<Sensor> sensorState;

@Override
public TypeA join(TypeB frame, TypeC activeMotionPaths) throws
JsonProcessingException {

Sensor sensor = sensorState.value();
if (sensor == null) {
LOG.debug("Sensor was not in state, getting sensor: " + frame.sensorId);
sensor = getSensor(frame);
sensorState.update(sensor);
}

return new TypeA();
}

@Override
public void open(Configuration config) {
LOG.debug("Sensor open method called", config);

StateTtlConfig sensorTtlConfig = StateTtlConfig.newBuilder(Time.minutes(1))
.cleanupInBackground()
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.
ReturnExpiredIfNotCleanedUp).build();

ValueStateDescriptor<Sensor> sensorStateDescriptor = new
ValueStateDescriptor<>( "sensor", TypeInformation.of(new TypeHint<Sensor
>(){}));
// sensorStateDescriptor.enableTimeToLive(sensorTtlConfig);
sensorState = getRuntimeContext().getState(sensorStateDescriptor);

}

private Sensor getSensor(TypeB frame) throws Exception {

Class.forName("org.postgresql.Driver");
try (Connection con = DriverManager.getConnection(dbURL, dbUser,
dbPassword);
Statement st = con.createStatement();
ResultSet rs = st.executeQuery("SELECT * from sensor where sensorid = '" +
frame.sensorId + "'" )) {

if(rs.next()) {
Sensor sensor = new Sensor() {};

LOG.debug("Got sensor" + sensor);

return sensor;
}

} catch (SQLException ex) {
LOG.error("Error when connection postgres", ex);
throw ex;
}

return null;
}

}

Above is a cut down version of my operator, I'm guessing it is the
ResultSet rs that is getting serialized. How do I prevent this undesirable
behaviour? I'm quite happy for my solution to serialize only what I
explicitly tell it to, I don't need exactly once or anything.

Many thanks,
Chris Stevens
Head of Research & Development
+44 7565 034 595


On Wed, 19 Feb 2020 at 12:19, Timo Walther <twal...@apache.org> wrote:

> Hi Chris,
>
> [forwarding the private discussion to the mailing list again]
>
> first of all, are you sure that your Sensor class is either a top-level
> class or a static inner class. Because it seems there is way more stuff
> in it (maybe included by accident transitively?). Such as:
>
> org.apache.logging.log4j.core.layout.AbstractCsvLayout
>       > Serialization trace:
>       > classes (sun.misc.Launcher$AppClassLoader)
>       > classloader (java.security.ProtectionDomain)
>       > cachedPDs (javax.security.auth.SubjectDomainCombiner)
>       > combiner (java.security.AccessControlContext)
>       > acc (sun.security.ssl.SSLSocketImpl)
>       > connection (org.postgresql.core.PGStream)
>       > pgStream (org.postgresql.core.v3.QueryExecutorImpl)
>       > transferModeRegistry (org.postgresql.core.v3.SimpleQuery)
>       > commitQuery (org.postgresql.jdbc.PgConnection)
>       > connection (org.postgresql.jdbc.PgResultSet)
>       > val$rs
>
> When declaring state you can use
> `org.apache.flink.api.common.typeinfo.Types#POJO(java.lang.Class<T>)` to
> check if your state is a POJO type.
>
> Regards,
> Timo
>
>
> -------- Forwarded Message --------
> Subject:        Re: Updating ValueState not working in hosted Kinesis
> Date:   Wed, 19 Feb 2020 12:02:16 +0000
> From:   Chris Stevens <ch...@sensingfeeling.com>
> To:     Timo Walther <twal...@apache.org>
>
>
>
> Hi Timo,
>
> Thanks for your reply. This makes sense to me, how do I treat something
> as a POJO instead of a generic serialized BB type? Sorry relatively new
> to Java and Flink.
>
> This is my full class def:
>
> package sensingfeeling.models;
> import java.io.Serializable;
>
> public class Sensor implements Serializable {
>
>       private static final long serialVersionUID = 8582433437601788991L;
>       public String sensorId;
>       public String companyId;
>       public String label;
>       // public Date createdAt;
>       // public Date updatedAt;
>       public Integer uncomfortableFaceLimit;
>       public Boolean online;
>       public String capabilityId;
>       // public Date lastOnlineAt;
>       // public Date lastOfflineAt;
>       public Integer onlineVersionNumber;
>       public int status;
>       @Override
>       public String toString(){
>           return this.sensorId + " - " + this.label;
>       }
> }
>
> Super simple really.
>
> I'm not trying to upgrade anything as far as I know. Just making an
> operator state aware.
>
> Many thanks,
> Chris Stevens
> Head of Research & Development
> +44 7565 034 595
>
>
> On Wed, 19 Feb 2020 at 11:55, Timo Walther <twal...@apache.org
> <mailto:twal...@apache.org>> wrote:
>
>      Hi Chris,
>
>      it seems there are field serialized into state that actually don't
>      belong there. You should aim to treat Sensor as a POJO instead of a
>      Kryo
>      generic serialized black-box type.
>
>      Furthermore, it seems that field such as
>      "org.apache.logging.log4j.core.layout.AbstractCsvLayout" should not be
>      state. Is there a "transient" keyword missing?
>
>      Are you trying to upgrade your job or the Flink version?
>
>      Regards,
>      Timo
>
>
>
>      On 18.02.20 18:59, Chris Stevens wrote:
>       > Hi there,
>       >
>       > I'm trying to update state in one of my applications hosted in
>      Kinesis
>       > Data Analytics.
>       >
>       > private transient ValueState<Sensor> sensorState;
>       > using sensorState.update(sensor);
>       >
>       > Get error:
>       >
>       > An error occurred: org.apache.flink.util.FlinkRuntimeException:
>      Error
>       > while adding data to RocksDB
>       > at
>       >
>
>
> org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:108)
>       > at
>       >
>
>
> org.apache.flink.runtime.state.ttl.TtlValueState.update(TtlValueState.java:50)
>       > at
>       >
>
>
> sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction.join(FrameMotionPathsToTelemetryJoinFunction.java:97)
>       > at
>       >
>
>
> sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction.join(FrameMotionPathsToTelemetryJoinFunction.java:48)
>       > at
>       >
>
>
> org.apache.flink.streaming.api.datastream.JoinedStreams$JoinCoGroupFunction.coGroup(JoinedStreams.java:460)
>       > at
>       >
>
>
> org.apache.flink.streaming.api.datastream.CoGroupedStreams$CoGroupWindowFunction.apply(CoGroupedStreams.java:777)
>       > at
>       >
>
>
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44)
>       > at
>       >
>
>
> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32)
>       > at
>       >
>
>
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:546)
>       > at
>       >
>
>
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:454)
>       > at
>       >
>
>
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:255)
>       > at
>       >
>
>
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
>       > at
>       >
>
>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:775)
>       > at
>       > org.apache.flink.streaming.runtime.io
>
> <http://runtime.io
> >.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
>       > at
>       >
>
>
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
>       > at
>       >
>
>
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
>       > at
>       > org.apache.flink.streaming.runtime.io
>
> <http://runtime.io
> >.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
>       > at
>       >
>
>
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>       > at
>       >
>
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:308)
>       > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:714)
>       > at java.lang.Thread.run(Thread.java:748)
>       > Caused by: com.esotericsoftware.kryo.KryoException:
>       > java.lang.IllegalArgumentException: Unable to create serializer
>       > "com.esotericsoftware.kryo.serializers.FieldSerializer" for class:
>       > org.apache.logging.log4j.core.layout.AbstractCsvLayout
>       > Serialization trace:
>       > classes (sun.misc.Launcher$AppClassLoader)
>       > classloader (java.security.ProtectionDomain)
>       > cachedPDs (javax.security.auth.SubjectDomainCombiner)
>       > combiner (java.security.AccessControlContext)
>       > acc (sun.security.ssl.SSLSocketImpl)
>       > connection (org.postgresql.core.PGStream)
>       > pgStream (org.postgresql.core.v3.QueryExecutorImpl)
>       > transferModeRegistry (org.postgresql.core.v3.SimpleQuery)
>       > commitQuery (org.postgresql.jdbc.PgConnection)
>       > connection (org.postgresql.jdbc.PgResultSet)
>       > val$rs
>       >
>
>
> (sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction$4)
>       > at
>       >
>
>
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82)
>       > at
>       >
>
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
>       > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
>       > at
>       >
>
>
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
>       > at
>       >
>
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
>       > at
> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
>       > at
>       >
>
>
> com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:88)
>       > at
>       >
>
>
> com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:21)
>       > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
>       > at
>       >
>
>
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
>       > at
>       >
>
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
>       > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
>       > at
>       >
>
>
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
>       > at
>       >
>
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
>       > at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577)
>       > at
>       >
>
>
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:68)
>       > at
>       >
>
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
>       > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
>       > at
>       >
>
>
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
>       > at
>       >
>
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
>       > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
>       > at
>       >
>
>
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
>       > at
>       >
>
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
>       > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
>       > at
>       >
>
>
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
>       > at
>       >
>
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
>       > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
>       > at
>       >
>
>
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
>       > at
>       >
>
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
>       > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
>       > at
>       >
>
>
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
>       > at
>       >
>
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
>       > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
>       > at
>       >
>
>
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
>       > at
>       >
>
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
>       > at
> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
>       > at
>       >
>
>
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:305)
>       > at
>       >
>
>
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.serialize(PojoSerializer.java:362)
>       > at
>       >
>
>
> org.apache.flink.api.common.typeutils.CompositeSerializer.serialize(CompositeSerializer.java:142)
>       > at
>       >
>
>
> org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValueInternal(AbstractRocksDBState.java:158)
>       > at
>       >
>
>
> org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:178)
>       > at
>       >
>
>
> org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeValue(AbstractRocksDBState.java:167)
>       > at
>       >
>
>
> org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:106)
>       > ... 20 more
>       > Caused by: java.lang.IllegalArgumentException: Unable to create
>       > serializer
>      "com.esotericsoftware.kryo.serializers.FieldSerializer" for
>       > class: org.apache.logging.log4j.core.layout.AbstractCsvLayout
>       > at
>       >
>
>
> com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:48)
>       > at
>       >
>
>
> com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:26)
>       > at
> com.esotericsoftware.kryo.Kryo.newDefaultSerializer(Kryo.java:351)
>       > at
> com.twitter.chill.KryoBase.newDefaultSerializer(KryoBase.scala:58)
>       > at
> com.esotericsoftware.kryo.Kryo.getDefaultSerializer(Kryo.java:344)
>       > at
>       >
>
>
> com.esotericsoftware.kryo.util.DefaultClassResolver.registerImplicit(DefaultClassResolver.java:56)
>       > at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:461)
>       > at com.twitter.chill.KryoBase.getRegistration(KryoBase.scala:52)
>       > at
>       >
>
>
> com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:79)
>       > at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:488)
>       > at
>       >
>
>
> com.esotericsoftware.kryo.serializers.DefaultSerializers$ClassSerializer.write(DefaultSerializers.java:239)
>       > at
>       >
>
>
> com.esotericsoftware.kryo.serializers.DefaultSerializers$ClassSerializer.write(DefaultSerializers.java:232)
>       > at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577)
>       > at
>       >
>
>
> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:75)
>       > at
>       >
>
>
> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:22)
>       > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
>       > at
>       >
>
>
> com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
>       > ... 62 more
>       > Caused by: java.lang.reflect.InvocationTargetException
>       > at sun.reflect.GeneratedConstructorAccessor42.newInstance(Unknown
>      Source)
>       > at
>       >
>
>
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>       > at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>       > at
>       >
>
>
> com.esotericsoftware.kryo.factories.ReflectionSerializerFactory.makeSerializer(ReflectionSerializerFactory.java:35)
>       > ... 78 more
>       > Caused by: java.lang.NoClassDefFoundError:
>       > Lorg/apache/commons/csv/CSVFormat;
>       > at java.lang.Class.getDeclaredFields0(Native Method)
>       > at java.lang.Class.privateGetDeclaredFields(Class.java:2583)
>       > at java.lang.Class.getDeclaredFields(Class.java:1916)
>       > at
>       >
>
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.rebuildCachedFields(FieldSerializer.java:193)
>       > at
>       >
>
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.rebuildCachedFields(FieldSerializer.java:156)
>       > at
>       >
>
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.<init>(FieldSerializer.java:133)
>       > ... 82 more
>       > Caused by: java.lang.ClassNotFoundException:
>       > org.apache.commons.csv.CSVFormat
>       > at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>       > at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
>       > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
>       > at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
>       > ... 88 more
>       >
>       > Any help would be great. I tried manually including CSVFormat from
>       > apache commons but didn't change anything.
>       >
>       > Many thanks,
>       > Chris Stevens
>       > Head of Research & Development
>       > +44 7565 034 595
>
>

Reply via email to