Re: StreamTableEnvironment can not run in batch mode

2020-02-19 Thread Jingsong Li
Hi Fanbin,

TableEnvironment is unification of batch/streaming in blink planner.
Use: TableEnvironment.create(fsSettings)

We continue improving TableEnvironment to contain more features.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#create-a-tableenvironment

Best,
Jingsong Lee

On Thu, Feb 20, 2020 at 7:21 AM Fanbin Bu  wrote:

> Hi,
>
> Currently, "StreamTableEnvironment can not run in batch mode for now,
> please use TableEnvironment."
>
> Are there any plans on the unification of batch/streaming roadmap that
> use StreamTableEnvironment for both streamingMode and batchMode?
>
> Thanks,
> Fanbin
>


-- 
Best, Jingsong Lee


JDBC source running continuously

2020-02-19 Thread Fanbin Bu
Hi,

My app creates the source from JDBC inputformat and running some sql and
print out. But the source terminates itself after the query is done. Is
there anyway to keep the source running?
samle code:
val env = StreamExecutionEnvironment.getExecutionEnvironment
val settings = EnvironmentSettings.newInstance()
  .useBlinkPlanner()
  .inStreamingMode()
  .build()
val tEnv = StreamTableEnvironment.create(env, settings)
val inputFormat = JDBCInputFormat.buildJDBCInputFormat.setQuery("select *
from table")... .finish()
val source = env.createInput(inputFormat)
tEnv.registerTableSource(source)
val queryResult = tEnv.sqlQuery("select * from awesomeSource")
queryResult.insertInto(mySink)


I searched around and its suggested to use .iterate(). can somebody give
more examples on how to use it in this case?

Thanks,
Fanbin


Fwd: StreamTableEnvironment can not run in batch mode

2020-02-19 Thread Fanbin Bu
Hi,

Currently, "StreamTableEnvironment can not run in batch mode for now,
please use TableEnvironment."

Are there any plans on the unification of batch/streaming roadmap that
use StreamTableEnvironment for both streamingMode and batchMode?

Thanks,
Fanbin


FlinkKinesisConsumer throws "Unknown datum type org.joda.time.DateTime"

2020-02-19 Thread Lian Jiang
Hi,

I use a FlinkKinesisConsumer in a Flink job to de-serialize kinesis events.

Flink: 1.9.2
Avro: 1.9.2

The serDe class is like:

public class ManagedSchemaKinesisPayloadSerDe
implements KinesisSerializationSchema,
KinesisDeserializationSchema {

private static final String REGISTRY_ENDPOINT =
"https://schema-registry.my.net;;
private static final long serialVersionUID = -1L;
private final Class tClass;
private String topic;

public ManagedSchemaKinesisPayloadSerDe(final Class tClass) {
this.tClass = tClass;
this.topic = null;
SpecificData.get().addLogicalTypeConversion(new
TimeConversions.TimestampConversion());
}

@Override
public ByteBuffer serialize(T obj) {
Properties props = new Properties();
props.put(KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS, false);
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
REGISTRY_ENDPOINT);

// some code to create schemaReg
final KafkaAvroSerializer serializer = new
KafkaAvroSerializer(schemaReg, new HashMap(props));
return ByteBuffer.wrap(
serializer.serialize(topic, obj));
}

@Override
public T deserialize(
byte[] record,
String partitionKey,
String sequenceNumber,
long eventUtcTimestamp,
String streamName,
String shardId) throws IOException {
Properties props = new Properties();
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,
Boolean.toString(true));
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
REGISTRY_ENDPOINT);
VerifiableProperties vProps = new VerifiableProperties(props);

// some code to create schemaReg

final KafkaAvroDecoder decoder = new KafkaAvroDecoder(schemaReg, vProps);
return  (T) decoder.fromBytes(record);
}

@Override
public TypeInformation getProducedType() {
return TypeInformation.of(tClass);
}

} // end of class ManagedSchemaKinesisPayloadSerDe


// create consumer, stream environment:

ManagedSchemaKinesisPayloadSerDe serDe =
new ManagedSchemaKinesisPayloadSerDe<>(MyPoJoRecord.class);

final FlinkKinesisConsumer consumer = new FlinkKinesisConsumer<>(
streamName,
serDe,
streamConfig);

streamEnv
   .addSource(consumer)
   .print();
streamEnv.execute();


The exception:
java.lang.RuntimeException: Unknown datum type org.joda.time.DateTime:
2020-02-16T19:14:20.983Z
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:766)
at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.access$000(KinesisDataFetcher.java:91)
at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$AsyncKinesisRecordEmitter.emit(KinesisDataFetcher.java:272)
at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$SyncKinesisRecordEmitter$1.put(KinesisDataFetcher.java:287)
at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$SyncKinesisRecordEmitter$1.put(KinesisDataFetcher.java:284)
at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:748)
at 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.deserializeRecordForCollectionAndUpdateState(ShardConsumer.java:371)
at 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:258)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.avro.AvroRuntimeException: Unknown datum type
org.joda.time.DateTime: 2020-02-16T19:14:20.983Z
at 
org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:772)
at 

Re: Flink job fails with org.apache.kafka.common.KafkaException: Failed to construct kafka consumer

2020-02-19 Thread John Smith
I think so also. But I was wondering if this was Consumer or actual Kafka
Broker. But this error displayed on the flink task node where the task was
running. The brokers looked fine at the time.
I have about a dozen topics which all are single partition except one which
is 18. So I really doubt the broker machines ran out of files.

On Mon, 17 Feb 2020 at 05:21, Piotr Nowojski  wrote:

> Hey, sorry but I know very little about the KafkaConsumer. I hope that
> someone else might know more.
>
> However, did you try to google this issue? It doesn’t sound like Flink
> specific problem, but like a general Kafka issue. Also a solution might be
> just as simple as bumping the limit of opened files on the unix system
> (ulimit command if I remember correctly?)
>
> Piotrek
>
> On 14 Feb 2020, at 23:35, John Smith  wrote:
>
> Hi Piotr, any thoughts on this?
>
> On Wed., Feb. 12, 2020, 3:29 a.m. Kostas Kloudas, 
> wrote:
>
>> Hi John,
>>
>> As you suggested, I would also lean towards increasing the number of
>> allowed open handles, but
>> for recommendation on best practices, I am cc'ing Piotr who may be
>> more familiar with the Kafka consumer.
>>
>> Cheers,
>> Kostas
>>
>> On Tue, Feb 11, 2020 at 9:43 PM John Smith 
>> wrote:
>> >
>> > Just wondering is this on the client side in the flink Job? I rebooted
>> the task and the job deployed correctly on another node.
>> >
>> > Is there a specific ulimit that we should set for flink tasks nodes?
>> >
>> > org.apache.kafka.common.KafkaException: Failed to construct kafka
>> consumer
>> > at
>> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:799)
>> > at
>> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:650)
>> > at
>> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:630)
>> > at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
>> > at
>> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
>> > at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:504)
>> > at
>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>> > at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>> > at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
>> > at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
>> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>> > at java.lang.Thread.run(Thread.java:748)
>> > Caused by: org.apache.kafka.common.KafkaException: java.io.IOException:
>> Too many open files
>> > at org.apache.kafka.common.network.Selector.(Selector.java:154)
>> > at org.apache.kafka.common.network.Selector.(Selector.java:188)
>> > at org.apache.kafka.common.network.Selector.(Selector.java:192)
>> > at
>> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:722)
>> > ... 11 more
>> > Caused by: java.io.IOException: Too many open files
>> > at sun.nio.ch.IOUtil.makePipe(Native Method)
>> > at sun.nio.ch.EPollSelectorImpl.(EPollSelectorImpl.java:65)
>> > at sun.nio.ch
>> .EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36)
>> > at java.nio.channels.Selector.open(Selector.java:227)
>> > at org.apache.kafka.common.network.Selector.(Selector.java:152)
>> > ... 14 more
>>
>
>


Re: Side Outputs from RichAsyncFunction

2020-02-19 Thread KristoffSC
Hi,
any thoughts about this one?

Regards,
Krzysztof



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Tests in FileUtilsTest while building Flink in local

2020-02-19 Thread Arujit Pradhan
Hi all,

I was trying to build Flink in my local machine and these two unit tests
are failing.



*[ERROR] Errors:[ERROR]
FileUtilsTest.testCompressionOnRelativePath:261->verifyDirectoryCompression:440
» NoSuchFile[ERROR]   FileUtilsTest.testDeleteDirectoryConcurrently »
FileSystem /var/folders/x9/tr2...*

I am building on these versions
Java 1.8.0_221
maven 3.6.3
and OS is Mac Catalina(10.15).

Did anyone face this issue? Am I missing something?

*The stack-trace is :*
java.nio.file.NoSuchFileException:
../../../../../../../../var/folders/x9/tr2xclq51sx891lbntv7bwy4gn/T/junit3367096668518353289/compressDir/rootDir

at
java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:92)
at
java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111)
at
java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:116)
at
java.base/sun.nio.fs.UnixFileSystemProvider.createDirectory(UnixFileSystemProvider.java:389)
at java.base/java.nio.file.Files.createDirectory(Files.java:689)
at
org.apache.flink.util.FileUtilsTest.verifyDirectoryCompression(FileUtilsTest.java:440)
at
org.apache.flink.util.FileUtilsTest.testCompressionOnRelativePath(FileUtilsTest.java:261)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at
com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at
com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)

Thanks in advance.


StreamTableEnvironment can not run in batch mode

2020-02-19 Thread Fanbin Bu
Hi,

Currently, "StreamTableEnvironment can not run in batch mode for now,
please use TableEnvironment."

Are there any plans on the unification of batch/streaming roadmap that
use StreamTableEnvironment for both streamingMode and batchMode?

Thanks,
Fanbin


Re: Link to Flink on K8S Webinar

2020-02-19 Thread Austin Bennett
Cool; @aniket and @dagang,

As someone who hasn't dug into the code of either (will go through your
recording) -- might you share any thoughts on differences between:
https://github.com/googlecloudplatform/flink-on-k8s-operator
and
https://github.com/lyft/flinkk8soperator
??


Also, for those in Bay Area (or to attend GCP NEXT), we'll have an
in-person talk touching on things related in April:
https://www.meetup.com/San-Francisco-Apache-Beam/events/268674177/



On Tue, Feb 18, 2020 at 11:48 AM Aizhamal Nurmamat kyzy 
wrote:

> Hi folks,
>
> Recently Aniket Mokashi and Dagang Wei hosted a webinar on how to use the
> flink k8s operator they have developed. The operator also supports working
> with Beam.
>
> If you think that this may be helpful to you, you may access the recording
> and slides via this link:
> https://www.cncf.io/webinars/operating-os-flink-beam-runtime-kubernetes/
>
> Thanks,
> Aizhamal
>


Re: Flink Kafka connector consume from a single kafka partition

2020-02-19 Thread hemant singh
Hi Arvid,

Thanks for your response. I think I did not word my question properly.
I wanted to confirm that if the data is distributed to more than one
partition then the ordering cannot be maintained (which is documented).
According to your response I understand if I set the parallelism to number
of partition then each consumer will consume from one partition and
ordering can be maintained.

However, I have a question here in case my parallelism is less than number
of partitions still I believe if I create keyedstream ordering will be
maintained at operator level for that key. Correct me if I am wrong.

Second, one issue/challenge which I see with this model is one of the
source's frequency of pushing data is very high then one partition is
overloaded. Hence the task which process this will be overloaded too,
however for maintaining ordering I do not have any other options but to
maintain data in one partition.

Thanks,
Hemant

On Wed, Feb 19, 2020 at 5:54 PM Arvid Heise  wrote:

> Hi Hemant,
>
> Flink passes your configurations to the Kafka consumer, so you could check
> if you can subscribe to only one partition there.
>
> However, I would discourage that approach. I don't see the benefit to just
> subscribing to the topic entirely and have dedicated processing for the
> different devices.
>
> If you are concerned about the order, you shouldn't. Since all events of a
> specific device-id reside in the same source partition, events are in-order
> in Kafka (responsibility of producer, but I'm assuming that because of your
> mail) and thus they are also in order in non-keyed streams in Flink. Any
> keyBy on device-id or composite key involving device-id, would also retain
> the order.
>
> If you have exactly one partition per device-id, you could even go with
> `DataStreamUtil#reinterpretAsKeyedStream` to avoid any shuffling.
>
> Let me know if I misunderstood your use case or if you have further
> questions.
>
> Best,
>
> Arvid
>
> On Wed, Feb 19, 2020 at 8:39 AM hemant singh  wrote:
>
>> Hello Flink Users,
>>
>> I have a use case where I am processing metrics from different type of
>> sources(one source will have multiple devices) and for aggregations as well
>> as build alerts order of messages is important. To maintain customer data
>> segregation I plan to have single topic for each customer with each source
>> stream data to one kafka partition.
>> To maintain ordering I am planning to push data for a single source type
>> to single partitions. Then I can create keyedstream so that each of the
>> device-id I have a single stream which has ordered data for each device-id.
>>
>> However, flink-kafka consumer I don't see that I can read from a specific
>> partition hence flink consumer read from multiple kafka partitions. So even
>> if I try to create a keyedstream on source type(and then write to a
>> partition for further processing like keyedstream on device-id) I think
>> ordering will not be maintained per source type.
>>
>> Only other option I feel I am left with is have single partition for the
>> topic so that flink can subscribe to the topic and this maintains the
>> ordering, the challenge is too many topics(as I have this configuration for
>> multiple customers) which is not advisable for a kafka cluster.
>>
>> Can anyone shed some light on how to handle this use case.
>>
>> Thanks,
>> Hemant
>>
>


Re: Updating ValueState not working in hosted Kinesis

2020-02-19 Thread Chris Stevens
Thanks so much Timo, got it working now. All down to my lack of Java skill.

Many thanks,
Chris Stevens
Head of Research & Development
+44 7565 034 595


On Wed, 19 Feb 2020 at 15:12, Timo Walther  wrote:

> Hi Chris,
>
> your observation is right. By `new Sensor() {}` instead of just `new
> Sensor()` you are creating an anonymous non-static class that references
> the outer method and class.
>
> If you check your logs, there might be also a reason why your POJO is
> used as a generic type. I assume because you declared `implements
> Serializable` which forces Flink to think "user wants to deal with
> serialization of the POJO himself".
>
> Regards,
> Timo
>
>
> On 19.02.20 14:24, Chris Stevens wrote:
> > Thanks again Timo, I hope I replied correctly this time.
> >
> > As per my previous message the Sensor class is a very simple POJO type
> > (I think).
> >
> > When the serialization trace talks about PGSql stuff it makes me think
> > that something from my operator is being included in serialization. Not
> > just the Sensor object itself which I am explicitly including in state.
> >
> > packagesensingfeeling.functions.mapping;
> >
> > publicfinalclassArbJoinFunctionextendsRichJoinFunction,
> > TypeA> {
> >
> > privatestaticfinallongserialVersionUID= 8582433437601788991L;
> >
> > privatetransientValueState sensorState;
> >
> > @Override
> > publicTypeAjoin(TypeBframe, TypeCactiveMotionPaths)
> > throwsJsonProcessingException{
> >
> > Sensorsensor= sensorState.value();
> > if(sensor == null) {
> > LOG.debug("Sensor was not in state, getting sensor: "+ frame.sensorId);
> > sensor = getSensor(frame);
> > sensorState.update(sensor);
> > }
> >
> > returnnewTypeA();
> > }
> >
> > @Override
> > publicvoidopen(Configurationconfig) {
> > LOG.debug("Sensor open method called", config);
> >
> > StateTtlConfigsensorTtlConfig= StateTtlConfig.newBuilder(Time.minutes(1))
> > .cleanupInBackground()
> > .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
> >
> .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp).build();
> >
> > ValueStateDescriptor sensorStateDescriptor=
> > newValueStateDescriptor<>( "sensor",
> > TypeInformation.of(newTypeHint(){}));
> > // sensorStateDescriptor.enableTimeToLive(sensorTtlConfig);
> > sensorState = getRuntimeContext().getState(sensorStateDescriptor);
> >
> > }
> >
> > privateSensorgetSensor(TypeBframe) throwsException{
> >
> > Class.forName("org.postgresql.Driver");
> > try(Connectioncon= DriverManager.getConnection(dbURL, dbUser,
> dbPassword);
> > Statementst= con.createStatement();
> > ResultSetrs= st.executeQuery("SELECT * from sensor where sensorid = '"+
> > frame.sensorId+ "'")) {
> >
> > if(rs.next()) {
> > Sensorsensor= newSensor() {};
> >
> > LOG.debug("Got sensor"+ sensor);
> >
> > returnsensor;
> > }
> >
> > } catch(SQLExceptionex) {
> > LOG.error("Error when connection postgres", ex);
> > throwex;
> > }
> >
> > returnnull;
> > }
> >
> > }
> >
> > Above is a cut down version of my operator, I'm guessing it is the
> > ResultSet rs that is getting serialized. How do I prevent this
> > undesirable behaviour? I'm quite happy for my solution to serialize only
> > what I explicitly tell it to, I don't need exactly once or anything.
> >
> > Many thanks,
> > Chris Stevens
> > Head of Research & Development
> > +44 7565 034 595
> >
> >
> > On Wed, 19 Feb 2020 at 12:19, Timo Walther  > > wrote:
> >
> > Hi Chris,
> >
> > [forwarding the private discussion to the mailing list again]
> >
> > first of all, are you sure that your Sensor class is either a
> top-level
> > class or a static inner class. Because it seems there is way more
> stuff
> > in it (maybe included by accident transitively?). Such as:
> >
> > org.apache.logging.log4j.core.layout.AbstractCsvLayout
> >> Serialization trace:
> >> classes (sun.misc.Launcher$AppClassLoader)
> >> classloader (java.security.ProtectionDomain)
> >> cachedPDs (javax.security.auth.SubjectDomainCombiner)
> >> combiner (java.security.AccessControlContext)
> >> acc (sun.security.ssl.SSLSocketImpl)
> >> connection (org.postgresql.core.PGStream)
> >> pgStream (org.postgresql.core.v3.QueryExecutorImpl)
> >> transferModeRegistry (org.postgresql.core.v3.SimpleQuery)
> >> commitQuery (org.postgresql.jdbc.PgConnection)
> >> connection (org.postgresql.jdbc.PgResultSet)
> >> val$rs
> >
> > When declaring state you can use
> >
>  `org.apache.flink.api.common.typeinfo.Types#POJO(java.lang.Class)` to
> >
> > check if your state is a POJO type.
> >
> > Regards,
> > Timo
> >
> >
> >  Forwarded Message 
> > Subject:Re: Updating ValueState not working in hosted Kinesis
> > Date:   Wed, 19 Feb 2020 12:02:16 +
> > From:   Chris Stevens  > >
> > 

Re: CSV StreamingFileSink

2020-02-19 Thread Austin Cawley-Edwards
Hey Timo,

Thanks for the assignment link! Looks like most of my issues can be solved
by getting better acquainted with Java file APIs and not in Flink-land.


Best,
Austin

On Wed, Feb 19, 2020 at 6:48 AM Timo Walther  wrote:

> Hi Austin,
>
> the StreamingFileSink allows bucketing the output data.
>
> This should help for your use case:
>
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html#bucket-assignment
>
> Regards,
> Timo
>
>
> On 19.02.20 01:00, Austin Cawley-Edwards wrote:
> > Following up on this -- does anyone know if it's possible to stream
> > individual files to a directory using the StreamingFileSink? For
> > instance, if I want all records that come in during a certain day to be
> > partitioned into daily directories:
> >
> > 2020-02-18/
> > large-file-1.txt
> > large-file-2.txt
> > 2020-02-19/
> > large-file-3.txt
> >
> > Or is there another way to accomplish this?
> >
> > Thanks!
> > Austin
> >
> > On Tue, Feb 18, 2020 at 5:33 PM Austin Cawley-Edwards
> > mailto:austin.caw...@gmail.com>> wrote:
> >
> > Hey all,
> >
> > Has anyone had success using the StreamingFileSink[1] to write CSV
> > files? And if so, what about compressed (Gzipped, ideally) files/
> > which libraries did you use?
> >
> >
> > Best,
> > Austin
> >
> >
> > [1]:
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
> >
>
>


Re: BucketingSink capabilities for DataSet API

2020-02-19 Thread aj
Thanks, Timo. I have not used and explore Table API until now. I have used
dataset and datastream API only.
I will read about the Table API.

On Wed, Feb 19, 2020 at 4:33 PM Timo Walther  wrote:

> Hi Anuj,
>
> another option would be to use the new Hive connectors. Have you looked
> into those? They might work on SQL internal data types which is why you
> would need to use the Table API then.
>
> Maybe Bowen in CC can help you here.
>
> Regards,
> Timo
>
> On 19.02.20 11:14, Rafi Aroch wrote:
> > Hi Anuj,
> >
> > It's been a while since I wrote this (Flink 1.5.2). Could be a
> > better/newer way, but this is what how I read & write Parquet with
> > hadoop-compatibility:
> >
> > // imports
> > import org.apache.avro.generic.GenericRecord;
> > import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat;
> >
> > import
> org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
> >
> > import org.apache.flink.hadoopcompatibility.HadoopInputs;
> > import org.apache.hadoop.conf.Configuration;
> > import org.apache.hadoop.fs.Path;
> > import org.apache.hadoop.mapreduce.Job;
> > import org.apache.parquet.avro.AvroParquetInputFormat;
> >
> > // Creating Parquet input format
> > Configuration conf = new Configuration();
> > Job job = Job.getInstance(conf);
> > AvroParquetInputFormat parquetInputFormat = new
> > AvroParquetInputFormat<>();
> > AvroParquetInputFormat.setInputDirRecursive(job, true);
> > AvroParquetInputFormat.setInputPaths(job, pathsToProcess);
> > HadoopInputFormat inputFormat
> > = HadoopInputs.createHadoopInput(parquetInputFormat, Void.class,
> > GenericRecord.class, job);
> >
> > // Creating Parquet output format
> > AvroParquetOutputFormat parquetOutputFormat = new
> > AvroParquetOutputFormat<>();
> > AvroParquetOutputFormat.setSchema(job, new
> > Schema.Parser().parse(SomeEvent.SCHEMA));
> > AvroParquetOutputFormat.setCompression(job,
> > CompressionCodecName.SNAPPY);
> > AvroParquetOutputFormat.setCompressOutput(job, true);
> > AvroParquetOutputFormat.setOutputPath(job, new Path(pathString));
> > HadoopOutputFormat outputFormat = new
> > HadoopOutputFormat<>(parquetOutputFormat, job);
> >
> > DataSource> inputFileSource =
> > env.createInput(inputFormat);
> >
> > // Start processing...
> >
> > // Writing result as Parquet
> > resultDataSet.output(outputFormat);
> >
> >
> > Regarding writing partitioned data, as far as I know, there is no way to
> > achieve that with the DataSet API with hadoop-compatibility.
> >
> > You could implement this with reading from input files as stream and
> > then using StreamingFileSink with a custom BucketAssigner [1].
> > The problem with that (which was not yet resolved AFAIK) is described
> > here [2] in "Important Notice 2".
> >
> > Sadly I say, that eventually, for this use-case I chose Spark to do the
> > job...
> >
> > [1]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html
> > [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html#general
> >
> > Hope this helps.
> >
> > Rafi
> >
> >
> > On Sat, Feb 15, 2020 at 5:03 PM aj  > > wrote:
> >
> > Hi Rafi,
> >
> > I have a similar use case where I want to read parquet files in the
> > dataset and want to perform some transformation and similarly want
> > to write the result using year month day partitioned.
> >
> > I am stuck at first step only where how to read and write
> > Parquet files using hadoop-Compatability.
> >
> > Please help me with this and also if u find the solution for how to
> > write data in partitioned.
> >
> > Thanks,
> > Anuj
> >
> > On Thu, Oct 25, 2018 at 5:35 PM Andrey Zagrebin
> > mailto:and...@data-artisans.com>> wrote:
> >
> > Hi Rafi,
> >
> > At the moment I do not see any support of Parquet in DataSet API
> > except HadoopOutputFormat, mentioned in stack overflow question.
> > I have cc’ed Fabian and Aljoscha, maybe they could provide more
> > information.
> >
> > Best,
> > Andrey
> >
> >> On 25 Oct 2018, at 13:08, Rafi Aroch  >> > wrote:
> >>
> >> Hi,
> >>
> >> I'm writing a Batch job which reads Parquet, does some
> >> aggregations and writes back as Parquet files.
> >> I would like the output to be partitioned by year, month, day
> >> by event time. Similarly to the functionality of the
> >> BucketingSink.
> >>
> >> I was able to achieve the reading/writing to/from Parquet by
> >> using the hadoop-compatibility features.
> >> I couldn't find a way to partition the data by year, month,
> >> day to create a folder hierarchy accordingly. Everything is
> >> written to a 

Re: BucketingSink capabilities for DataSet API

2020-02-19 Thread aj
Thanks, Rafi. I will try with this but yes if partitioning is not possible
then I also have to look some other solution.

On Wed, Feb 19, 2020 at 3:44 PM Rafi Aroch  wrote:

> Hi Anuj,
>
> It's been a while since I wrote this (Flink 1.5.2). Could be a
> better/newer way, but this is what how I read & write Parquet with
> hadoop-compatibility:
>
> // imports
>> import org.apache.avro.generic.GenericRecord;
>> import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat;
>>
> import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
>
> import org.apache.flink.hadoopcompatibility.HadoopInputs;
>> import org.apache.hadoop.conf.Configuration;
>> import org.apache.hadoop.fs.Path;
>> import org.apache.hadoop.mapreduce.Job;
>> import org.apache.parquet.avro.AvroParquetInputFormat;
>>
>> // Creating Parquet input format
>> Configuration conf = new Configuration();
>> Job job = Job.getInstance(conf);
>> AvroParquetInputFormat parquetInputFormat = new
>> AvroParquetInputFormat<>();
>> AvroParquetInputFormat.setInputDirRecursive(job, true);
>> AvroParquetInputFormat.setInputPaths(job, pathsToProcess);
>> HadoopInputFormat inputFormat
>> = HadoopInputs.createHadoopInput(parquetInputFormat, Void.class,
>> GenericRecord.class, job);
>>
>
>
>> // Creating Parquet output format
>> AvroParquetOutputFormat parquetOutputFormat = new
>> AvroParquetOutputFormat<>();
>> AvroParquetOutputFormat.setSchema(job, new
>> Schema.Parser().parse(SomeEvent.SCHEMA));
>> AvroParquetOutputFormat.setCompression(job, CompressionCodecName.SNAPPY);
>> AvroParquetOutputFormat.setCompressOutput(job, true);
>> AvroParquetOutputFormat.setOutputPath(job, new Path(pathString));
>> HadoopOutputFormat outputFormat = new
>> HadoopOutputFormat<>(parquetOutputFormat, job);
>
>
>
> DataSource> inputFileSource =
>> env.createInput(inputFormat);
>
>
>
> // Start processing...
>
>
>
> // Writing result as Parquet
>> resultDataSet.output(outputFormat);
>
>
> Regarding writing partitioned data, as far as I know, there is no way to
> achieve that with the DataSet API with hadoop-compatibility.
>
> You could implement this with reading from input files as stream and then
> using StreamingFileSink with a custom BucketAssigner [1].
> The problem with that (which was not yet resolved AFAIK) is described here
> [2] in "Important Notice 2".
>
> Sadly I say, that eventually, for this use-case I chose Spark to do the
> job...
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html#general
>
> Hope this helps.
>
> Rafi
>
>
> On Sat, Feb 15, 2020 at 5:03 PM aj  wrote:
>
>> Hi Rafi,
>>
>> I have a similar use case where I want to read parquet files in the
>> dataset and want to perform some transformation and similarly want to write
>> the result using year month day partitioned.
>>
>> I am stuck at first step only where how to read and write Parquet files
>> using hadoop-Compatability.
>>
>> Please help me with this and also if u find the solution for how to write
>> data in partitioned.
>>
>> Thanks,
>> Anuj
>>
>>
>> On Thu, Oct 25, 2018 at 5:35 PM Andrey Zagrebin 
>> wrote:
>>
>>> Hi Rafi,
>>>
>>> At the moment I do not see any support of Parquet in DataSet API
>>> except HadoopOutputFormat, mentioned in stack overflow question. I have
>>> cc’ed Fabian and Aljoscha, maybe they could provide more information.
>>>
>>> Best,
>>> Andrey
>>>
>>> On 25 Oct 2018, at 13:08, Rafi Aroch  wrote:
>>>
>>> Hi,
>>>
>>> I'm writing a Batch job which reads Parquet, does some aggregations and
>>> writes back as Parquet files.
>>> I would like the output to be partitioned by year, month, day by event
>>> time. Similarly to the functionality of the BucketingSink.
>>>
>>> I was able to achieve the reading/writing to/from Parquet by using the
>>> hadoop-compatibility features.
>>> I couldn't find a way to partition the data by year, month, day to
>>> create a folder hierarchy accordingly. Everything is written to a single
>>> directory.
>>>
>>> I could find an unanswered question about this issue:
>>> https://stackoverflow.com/questions/52204034/apache-flink-does-dataset-api-support-writing-output-to-individual-file-partit
>>>
>>> Can anyone suggest a way to achieve this? Maybe there's a way to
>>> integrate the BucketingSink with the DataSet API? Another solution?
>>>
>>> Rafi
>>>
>>>
>>>
>>
>> --
>> Thanks & Regards,
>> Anuj Jain
>> Mob. : +91- 8588817877
>> Skype : anuj.jain07
>> 
>>
>>
>> 
>>
>

-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07






RE: KafkaFetcher closed before end of stream is received for all partitions.

2020-02-19 Thread Bill Wicker
Thanks for the update! Since we are still in the planning stage I will try to 
find another way to achieve what we are trying to do in the meantime and I'll 
keep an eye on that Jira. Two workarounds I thought about are to either match 
the parallelism of the source to the partition count, or since this is a batch 
process that will be triggered, I can pass in a timestamp to start consuming 
from so that any lingering end of stream message from a previous run can be 
skipped.

Bill Wicker

-Original Message-
From: Tzu-Li Tai  
Sent: Monday, February 17, 2020 1:42 AM
To: user@flink.apache.org
Subject: Re: KafkaFetcher closed before end of stream is received for all 
partitions.

Hi,

Your observations are correct.
It is expected that the result of `KafkaDeserializationSchema#isEndOfStream`
triggers a single subtask to escape its fetch loop. Therefore, if a subtask is 
assigned multiple partitions, as soon as one record (regardless of which 
partition it came from) signals end of stream, then the subtask ends.

I'm afraid there is probably no good solution to this given the ill-defined 
semantics of the `isEndOfStream` method. All reasonable approaches that come to 
mind require some sort of external trigger to manually shut down the job.

For now, I've filed a JIRA to propose a possible solution to the semantics of 
the method: https://issues.apache.org/jira/browse/FLINK-16112



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Updating ValueState not working in hosted Kinesis

2020-02-19 Thread Timo Walther

Hi Chris,

your observation is right. By `new Sensor() {}` instead of just `new 
Sensor()` you are creating an anonymous non-static class that references 
the outer method and class.


If you check your logs, there might be also a reason why your POJO is 
used as a generic type. I assume because you declared `implements 
Serializable` which forces Flink to think "user wants to deal with 
serialization of the POJO himself".


Regards,
Timo


On 19.02.20 14:24, Chris Stevens wrote:

Thanks again Timo, I hope I replied correctly this time.

As per my previous message the Sensor class is a very simple POJO type 
(I think).


When the serialization trace talks about PGSql stuff it makes me think 
that something from my operator is being included in serialization. Not 
just the Sensor object itself which I am explicitly including in state.


packagesensingfeeling.functions.mapping;

publicfinalclassArbJoinFunctionextendsRichJoinFunction, 
TypeA> {


privatestaticfinallongserialVersionUID= 8582433437601788991L;

privatetransientValueState sensorState;

@Override
publicTypeAjoin(TypeBframe, TypeCactiveMotionPaths) 
throwsJsonProcessingException{


Sensorsensor= sensorState.value();
if(sensor == null) {
LOG.debug("Sensor was not in state, getting sensor: "+ frame.sensorId);
sensor = getSensor(frame);
sensorState.update(sensor);
}

returnnewTypeA();
}

@Override
publicvoidopen(Configurationconfig) {
LOG.debug("Sensor open method called", config);

StateTtlConfigsensorTtlConfig= StateTtlConfig.newBuilder(Time.minutes(1))
.cleanupInBackground()
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp).build();

ValueStateDescriptor sensorStateDescriptor= 
newValueStateDescriptor<>( "sensor", 
TypeInformation.of(newTypeHint(){}));

// sensorStateDescriptor.enableTimeToLive(sensorTtlConfig);
sensorState = getRuntimeContext().getState(sensorStateDescriptor);

}

privateSensorgetSensor(TypeBframe) throwsException{

Class.forName("org.postgresql.Driver");
try(Connectioncon= DriverManager.getConnection(dbURL, dbUser, dbPassword);
Statementst= con.createStatement();
ResultSetrs= st.executeQuery("SELECT * from sensor where sensorid = '"+ 
frame.sensorId+ "'")) {


if(rs.next()) {
Sensorsensor= newSensor() {};

LOG.debug("Got sensor"+ sensor);

returnsensor;
}

} catch(SQLExceptionex) {
LOG.error("Error when connection postgres", ex);
throwex;
}

returnnull;
}

}

Above is a cut down version of my operator, I'm guessing it is the 
ResultSet rs that is getting serialized. How do I prevent this 
undesirable behaviour? I'm quite happy for my solution to serialize only 
what I explicitly tell it to, I don't need exactly once or anything.


Many thanks,
Chris Stevens
Head of Research & Development
+44 7565 034 595


On Wed, 19 Feb 2020 at 12:19, Timo Walther > wrote:


Hi Chris,

[forwarding the private discussion to the mailing list again]

first of all, are you sure that your Sensor class is either a top-level
class or a static inner class. Because it seems there is way more stuff
in it (maybe included by accident transitively?). Such as:

org.apache.logging.log4j.core.layout.AbstractCsvLayout
       > Serialization trace:
       > classes (sun.misc.Launcher$AppClassLoader)
       > classloader (java.security.ProtectionDomain)
       > cachedPDs (javax.security.auth.SubjectDomainCombiner)
       > combiner (java.security.AccessControlContext)
       > acc (sun.security.ssl.SSLSocketImpl)
       > connection (org.postgresql.core.PGStream)
       > pgStream (org.postgresql.core.v3.QueryExecutorImpl)
       > transferModeRegistry (org.postgresql.core.v3.SimpleQuery)
       > commitQuery (org.postgresql.jdbc.PgConnection)
       > connection (org.postgresql.jdbc.PgResultSet)
       > val$rs

When declaring state you can use
`org.apache.flink.api.common.typeinfo.Types#POJO(java.lang.Class)` to

check if your state is a POJO type.

Regards,
Timo


 Forwarded Message 
Subject:        Re: Updating ValueState not working in hosted Kinesis
Date:   Wed, 19 Feb 2020 12:02:16 +
From:   Chris Stevens mailto:ch...@sensingfeeling.com>>
To:     Timo Walther mailto:twal...@apache.org>>



Hi Timo,

Thanks for your reply. This makes sense to me, how do I treat something
as a POJO instead of a generic serialized BB type? Sorry relatively new
to Java and Flink.

This is my full class def:

package sensingfeeling.models;
import java.io.Serializable;

public class Sensor implements Serializable {

       private static final long serialVersionUID =
8582433437601788991L;
       public String sensorId;
       public String companyId;
       public String label;
       // public Date createdAt;
       // public Date updatedAt;
       public Integer 

Re: Process stream multiple time with different KeyBy

2020-02-19 Thread Lehuede sebastien
Hi guys,

Thanks for your answers and sorry for the late reply.

My use case is :

I receive some events on one stream, each events can contain:


   - 1 field category
   - 1 field subcategory
   - 1 field category AND 1 field subcategory

Events are matched against rules which can contain :


   -
   - 1 field category
   - 1 field subcategory
   - 1 field category AND 1 field subcategory


Now, let's say I receive an Event containing the following fields,
category=foo and subcategory=bar. I want to be able to match this event
against rule also containing category=foo and subcategory=bar in the
specification but I also want to be able to match this events against rules
containing category=foo OR rules containing subcategory=bar in
specification.

But I think I already have many information in your answers, I will
definitely take a look at the Fraud Detection System example for the
DynamicKeyFunction. And try to work with 2 different streams (One stream
for events with single Key an one stream for events with multiple Key) as
suggested by Till.

Thanks again !
Sébastien

On Tue, Feb 18, 2020 at 6:16 AM Till Rohrmann  wrote:

> Hi Sébastien,
>
> there is always the possibility to reuse a stream. Given a
> DataStream input, you can do the following:
>
> KeyedStream a = input.keyBy(x -> f(x));
> KeyedStream b = input.keyBy(x -> g(x));
>
> This gives you two differently partitioned streams a and b.
>
> If you want to evaluate every event against the full set of rules, then
> you could take a look at Flink Broadcast State Pattern [1]. It allows you
> to broadcast a stream of rules to all operators of a keyed input stream.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
>
> Cheers,
> Till
>
> On Mon, Feb 17, 2020 at 11:10 PM theo.diefent...@scoop-software.de <
> theo.diefent...@scoop-software.de> wrote:
>
>> Hi Sebastian,
>> I'd also highly recommend a recent Flink blog post to you where exactly
>> this question was answered in quote some detail :
>> https://flink.apache.org/news/2020/01/15/demo-fraud-detection.html
>> Best regardsTheo
>>  Ursprüngliche Nachricht 
>> Von: Eduardo Winpenny Tejedor 
>> Datum: Mo., 17. Feb. 2020, 21:07
>> An: Lehuede sebastien 
>> Cc: user 
>> Betreff: Re: Process stream multiple time with different KeyBy
>>
>>
>> Hi Sebastien,
>>
>> Without being entirely sure of what's your use case/end goal I'll tell
>> you (some of) the options Flink provides you for defining a flow.
>>
>> If your use case is to apply the same rule to each of your "swimlanes"
>> of data (one with category=foo AND subcategory=bar, another with
>> category=foo and another with category=bar) you can do this by
>> implementing your own org.apache.flink.api.java.functions.KeySelector
>> function for the keyBy function. You'll just need to return a
>> different key for each of your rules and the data will separate to the
>> appropriate "swimlane".
>>
>> If your use case is to apply different rules to each swimlane then you
>> can write a ProcessFunction that redirects elements to different *side
>> outputs*. You can then apply different operations to each side output.
>>
>> Your application could get tricky to evolve IF the number of swimlanes
>> or the operators are meant to change over time, you'd have to be
>> careful how the existing state fits into your new flows.
>>
>> Regards,
>> Eduardo
>>
>> On Mon, Feb 17, 2020 at 7:06 PM Lehuede sebastien 
>> wrote:
>> >
>> > Hi all,
>> >
>> > I'm currently working on a Flink Application where I match events
>> against a set of rules. At the beginning I wanted to dynamically create
>> streams following the category of events (Event are JSON formatted and I've
>> a field like "category":"foo" in each event) but I'm stuck by the
>> impossibility to create streams at runtime.
>> >
>> > So, one of the solution for me is to create a single Kafka topic and
>> then use the "KeyBy" operator to match events with "category":"foo" against
>> rules also containing "category":"foo" in rule specification.
>> >
>> > Now I have some cases where events and rules have one category and one
>> subcategory. At this point I'm not sure about the "KeyBy" operator behavior.
>> >
>> > Example :
>> >
>> > Events have : "category":"foo" AND "subcategory":"bar"
>> > Rule1 specification has : "category":"foo" AND "subcategory":"bar"
>> > Rule2 specification has : "category':"foo"
>> > Rule3 specification has : "category":"bar"
>> >
>> > In this case, my events need to be match against Rule1, Rule2 and Rule3.
>> >
>> > If I'm right, if I apply a multiple key "KeyBy()" with "category" and
>> "subcategory" fields and then apply two single key "KeyBy()" with
>> "category" field, my events will be consumed by the first "KeyBy()"
>> operator and no events will be streamed in the operators after ?
>> >
>> > Is there any way to process the same stream one time for multi key
>> KeyBy() and another time for single key KeyBy() ?
>> >
>> > 

Re: Re: Updating ValueState not working in hosted Kinesis

2020-02-19 Thread Chris Stevens
Thanks again Timo, I hope I replied correctly this time.

As per my previous message the Sensor class is a very simple POJO type (I
think).

When the serialization trace talks about PGSql stuff it makes me think that
something from my operator is being included in serialization. Not just the
Sensor object itself which I am explicitly including in state.

package sensingfeeling.functions.mapping;

public final class ArbJoinFunction extends RichJoinFunction,
TypeA> {

private static final long serialVersionUID = 8582433437601788991L;

private transient ValueState sensorState;

@Override
public TypeA join(TypeB frame, TypeC activeMotionPaths) throws
JsonProcessingException {

Sensor sensor = sensorState.value();
if (sensor == null) {
LOG.debug("Sensor was not in state, getting sensor: " + frame.sensorId);
sensor = getSensor(frame);
sensorState.update(sensor);
}

return new TypeA();
}

@Override
public void open(Configuration config) {
LOG.debug("Sensor open method called", config);

StateTtlConfig sensorTtlConfig = StateTtlConfig.newBuilder(Time.minutes(1))
.cleanupInBackground()
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.
ReturnExpiredIfNotCleanedUp).build();

ValueStateDescriptor sensorStateDescriptor = new
ValueStateDescriptor<>( "sensor", TypeInformation.of(new TypeHint(){}));
// sensorStateDescriptor.enableTimeToLive(sensorTtlConfig);
sensorState = getRuntimeContext().getState(sensorStateDescriptor);

}

private Sensor getSensor(TypeB frame) throws Exception {

Class.forName("org.postgresql.Driver");
try (Connection con = DriverManager.getConnection(dbURL, dbUser,
dbPassword);
Statement st = con.createStatement();
ResultSet rs = st.executeQuery("SELECT * from sensor where sensorid = '" +
frame.sensorId + "'" )) {

if(rs.next()) {
Sensor sensor = new Sensor() {};

LOG.debug("Got sensor" + sensor);

return sensor;
}

} catch (SQLException ex) {
LOG.error("Error when connection postgres", ex);
throw ex;
}

return null;
}

}

Above is a cut down version of my operator, I'm guessing it is the
ResultSet rs that is getting serialized. How do I prevent this undesirable
behaviour? I'm quite happy for my solution to serialize only what I
explicitly tell it to, I don't need exactly once or anything.

Many thanks,
Chris Stevens
Head of Research & Development
+44 7565 034 595


On Wed, 19 Feb 2020 at 12:19, Timo Walther  wrote:

> Hi Chris,
>
> [forwarding the private discussion to the mailing list again]
>
> first of all, are you sure that your Sensor class is either a top-level
> class or a static inner class. Because it seems there is way more stuff
> in it (maybe included by accident transitively?). Such as:
>
> org.apache.logging.log4j.core.layout.AbstractCsvLayout
>   > Serialization trace:
>   > classes (sun.misc.Launcher$AppClassLoader)
>   > classloader (java.security.ProtectionDomain)
>   > cachedPDs (javax.security.auth.SubjectDomainCombiner)
>   > combiner (java.security.AccessControlContext)
>   > acc (sun.security.ssl.SSLSocketImpl)
>   > connection (org.postgresql.core.PGStream)
>   > pgStream (org.postgresql.core.v3.QueryExecutorImpl)
>   > transferModeRegistry (org.postgresql.core.v3.SimpleQuery)
>   > commitQuery (org.postgresql.jdbc.PgConnection)
>   > connection (org.postgresql.jdbc.PgResultSet)
>   > val$rs
>
> When declaring state you can use
> `org.apache.flink.api.common.typeinfo.Types#POJO(java.lang.Class)` to
> check if your state is a POJO type.
>
> Regards,
> Timo
>
>
>  Forwarded Message 
> Subject:Re: Updating ValueState not working in hosted Kinesis
> Date:   Wed, 19 Feb 2020 12:02:16 +
> From:   Chris Stevens 
> To: Timo Walther 
>
>
>
> Hi Timo,
>
> Thanks for your reply. This makes sense to me, how do I treat something
> as a POJO instead of a generic serialized BB type? Sorry relatively new
> to Java and Flink.
>
> This is my full class def:
>
> package sensingfeeling.models;
> import java.io.Serializable;
>
> public class Sensor implements Serializable {
>
>   private static final long serialVersionUID = 8582433437601788991L;
>   public String sensorId;
>   public String companyId;
>   public String label;
>   // public Date createdAt;
>   // public Date updatedAt;
>   public Integer uncomfortableFaceLimit;
>   public Boolean online;
>   public String capabilityId;
>   // public Date lastOnlineAt;
>   // public Date lastOfflineAt;
>   public Integer onlineVersionNumber;
>   public int status;
>   @Override
>   public String toString(){
>   return this.sensorId + " - " + this.label;
>   }
> }
>
> Super simple really.
>
> I'm not trying to upgrade anything as far as I know. Just making an
> operator state aware.
>
> Many thanks,
> Chris Stevens
> Head of Research & Development
> +44 7565 034 595
>
>
> On Wed, 19 Feb 2020 at 11:55, Timo Walther  

Re: Parallelize Kafka Deserialization of a single partition?

2020-02-19 Thread Till Rohrmann
I have to correct myself. DataStream respects the
ExecutionConfig.enableObjectReuse which happens in the form of creating
different Outputs in the OperatorChain. This is also in line with the
different behaviour you are observing.

Concerning your initial question Theo, you could do the following if you
are sure that none of your operators keeps a reference to the created Pojo,
the Pojo is not directly stored in state and there is no AsyncI/O operator
working on the Pojo: In the operator where you do the parsing you can
simply keep a reference to a single instance of the POJO and reuse it
whenever you process a new event. That way you should avoid the creation of
new instances. However, keep in mind that this is a potentially dangerous
operation if any of the before-mentioned conditions is violated.

Cheers,
Till

On Wed, Feb 19, 2020 at 12:29 PM Timo Walther  wrote:

> Hi Theo,
>
> there are lot of performance improvements that Flink could do but they
> would further complicate the interfaces and API. They would require deep
> knowledge of users about the runtime when it is safe to reuse object and
> when not.
>
> The Table/SQL API of Flink uses a lot of these optimization under the
> hood and works on binary data for reducing garbage collection.
>
> For the DataStream API, the community decided for safety/correctness
> before performance in this case. But disabling the object reuse and
> further low level optimization should give a good result if needed.
>
> Regards,
> Timo
>
>
> On 19.02.20 10:43, Theo Diefenthal wrote:
> > I have the same experience as Eleanore,
> >
> > When enabling object reuse, I saw a significant performance improvement
> > and in my profiling session. I saw that a lot of
> > serialization/deserialization was not performed any more.
> >
> > That’s why my question should originally aim a bit further: It’s good
> > that Flink reuses objects, but I still need to create a new instance of
> > my objects per event when parsed, which is ultimately dropped at some
> > processing step in the flink pipeline later on (map, shuffle or sink).
> > Wouldn’t it be possible that the “deserialize” method can have an
> > optional “oldPOJO” input where Flink provides me an unused old instance
> > of my POJO if it has one left? And if null, I instantiate a new instance
> > in my code? With billions of small events ingested per day, I can
> > imagine this to be another small performance improvement especially in
> > terms of garbage collection…
> >
> > Best regads
> >
> > Theo
> >
> > *From:*Till Rohrmann 
> > *Sent:* Mittwoch, 19. Februar 2020 07:34
> > *To:* Jin Yi 
> > *Cc:* user 
> > *Subject:* Re: Parallelize Kafka Deserialization of a single partition?
> >
> > Then my statement must be wrong. Let me double check this. Yesterday
> > when checking the usage of the objectReuse field, I could only see it in
> > the batch operators. I'll get back to you.
> >
> > Cheers,
> >
> > Till
> >
> > On Wed, Feb 19, 2020, 07:05 Jin Yi  > > wrote:
> >
> > Hi Till,
> >
> > I just read your comment:
> >
> > Currently, enabling object reuse via
> > ExecutionConfig.enableObjectReuse() only affects the DataSet API.
> > DataStream programs will always do defensive copies. There is a FLIP
> > to improve this behaviour [1].
> >
> > I have an application that is written in apache beam, but the runner
> > is flink, in the configuration of the pipeline, it is in streaming
> > mode, and I see performance difference between enable/disable
> > ObjectReuse, also when running in debugging mode, I noticed that
> > with objectReuse set to true, there is no
> > serialization/deserialization happening between operators, however,
> > when set to false, in between each operator, the serialization and
> > deserialization is happening. So do you have any idea why this is
> > happening?
> >
> > MyOptions options = PipelineOptionsFactory./as/(MyOptions.*class*);
> >
> > options.setStreaming(*true*);
> >
> > options.setObjectReuse(*true*);
> >
> > Thanks a lot!
> >
> > Eleanore
> >
> > On Tue, Feb 18, 2020 at 6:05 AM Till Rohrmann  > > wrote:
> >
> > Hi Theo,
> >
> > the KafkaDeserializationSchema does not allow to return
> > asynchronous results. Hence, Flink will always wait until
> > KafkaDeserializationSchema.deserialize returns the parsed value.
> > Consequently, the only way I can think of to offload the complex
> > parsing logic would be to do it in a downstream operator where
> > you could use AsyncI/O to run the parsing logic in a thread
> > pool, for example.
> >
> > Alternatively, you could think about a simple program which
> > transforms your input events into another format where it is
> > easier to extract the timestamp from. This comes, however, at
> > the cost of another Kafka 

Re: Flink Kafka connector consume from a single kafka partition

2020-02-19 Thread Arvid Heise
Hi Hemant,

Flink passes your configurations to the Kafka consumer, so you could check
if you can subscribe to only one partition there.

However, I would discourage that approach. I don't see the benefit to just
subscribing to the topic entirely and have dedicated processing for the
different devices.

If you are concerned about the order, you shouldn't. Since all events of a
specific device-id reside in the same source partition, events are in-order
in Kafka (responsibility of producer, but I'm assuming that because of your
mail) and thus they are also in order in non-keyed streams in Flink. Any
keyBy on device-id or composite key involving device-id, would also retain
the order.

If you have exactly one partition per device-id, you could even go with
`DataStreamUtil#reinterpretAsKeyedStream` to avoid any shuffling.

Let me know if I misunderstood your use case or if you have further
questions.

Best,

Arvid

On Wed, Feb 19, 2020 at 8:39 AM hemant singh  wrote:

> Hello Flink Users,
>
> I have a use case where I am processing metrics from different type of
> sources(one source will have multiple devices) and for aggregations as well
> as build alerts order of messages is important. To maintain customer data
> segregation I plan to have single topic for each customer with each source
> stream data to one kafka partition.
> To maintain ordering I am planning to push data for a single source type
> to single partitions. Then I can create keyedstream so that each of the
> device-id I have a single stream which has ordered data for each device-id.
>
> However, flink-kafka consumer I don't see that I can read from a specific
> partition hence flink consumer read from multiple kafka partitions. So even
> if I try to create a keyedstream on source type(and then write to a
> partition for further processing like keyedstream on device-id) I think
> ordering will not be maintained per source type.
>
> Only other option I feel I am left with is have single partition for the
> topic so that flink can subscribe to the topic and this maintains the
> ordering, the challenge is too many topics(as I have this configuration for
> multiple customers) which is not advisable for a kafka cluster.
>
> Can anyone shed some light on how to handle this use case.
>
> Thanks,
> Hemant
>


Fwd: Re: Updating ValueState not working in hosted Kinesis

2020-02-19 Thread Timo Walther

Hi Chris,

[forwarding the private discussion to the mailing list again]

first of all, are you sure that your Sensor class is either a top-level 
class or a static inner class. Because it seems there is way more stuff 
in it (maybe included by accident transitively?). Such as:


org.apache.logging.log4j.core.layout.AbstractCsvLayout
 > Serialization trace:
 > classes (sun.misc.Launcher$AppClassLoader)
 > classloader (java.security.ProtectionDomain)
 > cachedPDs (javax.security.auth.SubjectDomainCombiner)
 > combiner (java.security.AccessControlContext)
 > acc (sun.security.ssl.SSLSocketImpl)
 > connection (org.postgresql.core.PGStream)
 > pgStream (org.postgresql.core.v3.QueryExecutorImpl)
 > transferModeRegistry (org.postgresql.core.v3.SimpleQuery)
 > commitQuery (org.postgresql.jdbc.PgConnection)
 > connection (org.postgresql.jdbc.PgResultSet)
 > val$rs

When declaring state you can use 
`org.apache.flink.api.common.typeinfo.Types#POJO(java.lang.Class)` to 
check if your state is a POJO type.


Regards,
Timo


 Forwarded Message 
Subject:Re: Updating ValueState not working in hosted Kinesis
Date:   Wed, 19 Feb 2020 12:02:16 +
From:   Chris Stevens 
To: Timo Walther 



Hi Timo,

Thanks for your reply. This makes sense to me, how do I treat something 
as a POJO instead of a generic serialized BB type? Sorry relatively new 
to Java and Flink.


This is my full class def:

package sensingfeeling.models;
import java.io.Serializable;

public class Sensor implements Serializable {

     private static final long serialVersionUID = 8582433437601788991L;
     public String sensorId;
     public String companyId;
     public String label;
     // public Date createdAt;
     // public Date updatedAt;
     public Integer uncomfortableFaceLimit;
     public Boolean online;
     public String capabilityId;
     // public Date lastOnlineAt;
     // public Date lastOfflineAt;
     public Integer onlineVersionNumber;
     public int status;
     @Override
     public String toString(){
     return this.sensorId + " - " + this.label;
     }
}

Super simple really.

I'm not trying to upgrade anything as far as I know. Just making an 
operator state aware.


Many thanks,
Chris Stevens
Head of Research & Development
+44 7565 034 595


On Wed, 19 Feb 2020 at 11:55, Timo Walther > wrote:


Hi Chris,

it seems there are field serialized into state that actually don't
belong there. You should aim to treat Sensor as a POJO instead of a
Kryo
generic serialized black-box type.

Furthermore, it seems that field such as
"org.apache.logging.log4j.core.layout.AbstractCsvLayout" should not be
state. Is there a "transient" keyword missing?

Are you trying to upgrade your job or the Flink version?

Regards,
Timo



On 18.02.20 18:59, Chris Stevens wrote:
 > Hi there,
 >
 > I'm trying to update state in one of my applications hosted in
Kinesis
 > Data Analytics.
 >
 > private transient ValueState sensorState;
 > using sensorState.update(sensor);
 >
 > Get error:
 >
 > An error occurred: org.apache.flink.util.FlinkRuntimeException:
Error
 > while adding data to RocksDB
 > at
 >

org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:108)
 > at
 >

org.apache.flink.runtime.state.ttl.TtlValueState.update(TtlValueState.java:50)
 > at
 >

sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction.join(FrameMotionPathsToTelemetryJoinFunction.java:97)
 > at
 >

sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction.join(FrameMotionPathsToTelemetryJoinFunction.java:48)
 > at
 >

org.apache.flink.streaming.api.datastream.JoinedStreams$JoinCoGroupFunction.coGroup(JoinedStreams.java:460)
 > at
 >

org.apache.flink.streaming.api.datastream.CoGroupedStreams$CoGroupWindowFunction.apply(CoGroupedStreams.java:777)
 > at
 >

org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44)
 > at
 >

org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32)
 > at
 >

org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:546)
 > at
 >

org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:454)
 > at
 >

org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:255)
 > at
 >

org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
 > at
 >


Re: Table API: Joining on Tables of Complex Types

2020-02-19 Thread Timo Walther

Hi Andreas,

you are right, currently the Row type only supports accessing fields by 
index. Usually, we recommend to fully work in Table API. There you can 
access structured type fields by name (`SELECT row.field.field` or 
`'row.get("field").get("field")`) and additional utilities such as 
`flatten()`.


Can't you just use the schema of the table to as a helper for bridging 
the names to indices?


Regards,
Timo


On 14.02.20 18:41, Hailu, Andreas wrote:

Hi Timo, Dawid,

This was very helpful - thanks! The Row type seems to only support getting 
fields by their index. Is there a way to get a field by its name like the Row 
class in Spark? Link: 
https://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/sql/Row.html#getAs(java.lang.String)

Our use case is that we're developing a data-processing library for developers 
leveraging our system to refine existing datasets and produce new ones. The 
flow is as follows:

Our library reads Avro/Parquet GenericRecord data files from a source and turns it 
into a Table --> users write a series of operations on this Table to create a new 
resulting Table--> resulting Table is then transformed persisted back to the file 
system as Avro GenericRecords in Avro/Parquet file.

We can map the Row field names to their corresponding indexes by patching the 
AvroRowDeserializationSchema class, but it's the step where we handle expose 
the Table to our users and then try and persist which will end up in this 
metadata loss. We know what fields the Table must be composed of, but we just 
won't know which index they live in so Row#getField() isn't what quite what we 
need.

// ah

-Original Message-
From: Timo Walther 
Sent: Friday, January 17, 2020 11:29 AM
To: user@flink.apache.org
Subject: Re: Table API: Joining on Tables of Complex Types

Hi Andreas,

if dataset.getType() returns a RowTypeInfo you can ignore this log message. The type 
extractor runs before the ".returns()" but with this method you override the 
old type.

Regards,
Timo


On 15.01.20 15:27, Hailu, Andreas wrote:

Dawid, this approach looks promising. I'm able to flatten out my Avro
records into Rows and run simple queries atop of them. I've got a
question - when I register my Rows as a table, I see the following log
providing a warning:

/2020-01-14 17:16:43,083 [main] INFO  TypeExtractor - class
org.apache.flink.types.Row does not contain a getter for field fields/

/2020-01-14 17:16:43,083 [main] INFO  TypeExtractor - class
org.apache.flink.types.Row does not contain a setter for field fields/

/2020-01-14 17:16:43,084 [main] INFO  TypeExtractor - Class class
org.apache.flink.types.Row cannot be used as a POJO type because not
all fields are valid POJO fields, and must be processed as GenericType.
Please read the Flink documentation on "Data Types & Serialization"
for details of the effect on performance./

Will this be problematic even now that we've provided TypeInfos for
the Rows? Performance is something that I'm concerned about as I've
already introduced a new operation to transform our records to Rows.

*// *ah**

*From:* Hailu, Andreas [Engineering]
*Sent:* Wednesday, January 8, 2020 12:08 PM
*To:* 'Dawid Wysakowicz' ;
mailto:user@flink.apache.org
*Cc:* Richards, Adam S [Engineering] 
*Subject:* RE: Table API: Joining on Tables of Complex Types

Very well - I'll give this a try. Thanks, Dawid.

*// *ah**

*From:* Dawid Wysakowicz mailto:dwysakow...@apache.org>>
*Sent:* Wednesday, January 8, 2020 7:21 AM
*To:* Hailu, Andreas [Engineering] mailto:andreas.ha...@ny.email.gs.com>>; mailto:user@flink.apache.org

*Cc:* Richards, Adam S [Engineering] mailto:adam.richa...@ny.email.gs.com>>
*Subject:* Re: Table API: Joining on Tables of Complex Types

Hi Andreas,

Converting your GenericRecords to Rows would definitely be the safest
option. You can check how its done in the
org.apache.flink.formats.avro.AvroRowDeserializationSchema. You can
reuse the logic from there to write something like:

  DataSet dataset = ...

  dataset.map( /* convert GenericRecord to Row
*/).returns(AvroSchemaConverter.convertToTypeInfo(avroSchemaString));

Another thing you could try is to make sure that GenericRecord is seen
as an avro type by fink (flink should understand that avro type is a
complex type):

  dataset.returns(new GenericRecordAvroTypeInfo(/*schema string*/)

than the TableEnvironment should pick it up as a structured type and
flatten it automatically when registering the Table. Bear in mind the
returns method is part of SingleInputUdfOperator so you can apply it
right after some transformation e.g. map/flatMap etc.

Best,

Dawid

On 06/01/2020 18:03, Hailu, Andreas wrote:

 Hi David, thanks for getting back.

  From what you've said, I think we'll need to convert our
 GenericRecord into structured types - do you have any references or
 examples I can have a look at? If 

Re: How Do i Serialize a class using default kryo and protobuf in scala

2020-02-19 Thread Timo Walther

Hi,

would Apache Avro be an option for you? Because this is currently still 
the best supported format when it comes to schema upgrades as far as I 
know. Maybe Gordon in CC can give your some additional hints.


Regards,
Timo


On 18.02.20 10:38, ApoorvK wrote:

I have some case class which have primitive as well as nested class objects
hence if I add any more variable in class savepoint does not restore I read
if I can add kyroserializer on those class using google protobuf  I will be
able to serialize it from state. Can anyone please share any example in
scala for the same.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: Updating ValueState not working in hosted Kinesis

2020-02-19 Thread Timo Walther

Hi Chris,

it seems there are field serialized into state that actually don't 
belong there. You should aim to treat Sensor as a POJO instead of a Kryo 
generic serialized black-box type.


Furthermore, it seems that field such as 
"org.apache.logging.log4j.core.layout.AbstractCsvLayout" should not be 
state. Is there a "transient" keyword missing?


Are you trying to upgrade your job or the Flink version?

Regards,
Timo



On 18.02.20 18:59, Chris Stevens wrote:

Hi there,

I'm trying to update state in one of my applications hosted in Kinesis 
Data Analytics.


private transient ValueState sensorState;
using sensorState.update(sensor);

Get error:

An error occurred: org.apache.flink.util.FlinkRuntimeException: Error 
while adding data to RocksDB
at 
org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:108)
at 
org.apache.flink.runtime.state.ttl.TtlValueState.update(TtlValueState.java:50)
at 
sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction.join(FrameMotionPathsToTelemetryJoinFunction.java:97)
at 
sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction.join(FrameMotionPathsToTelemetryJoinFunction.java:48)
at 
org.apache.flink.streaming.api.datastream.JoinedStreams$JoinCoGroupFunction.coGroup(JoinedStreams.java:460)
at 
org.apache.flink.streaming.api.datastream.CoGroupedStreams$CoGroupWindowFunction.apply(CoGroupedStreams.java:777)
at 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44)
at 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32)
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:546)
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:454)
at 
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:255)
at 
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:775)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:308)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:714)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.esotericsoftware.kryo.KryoException: 
java.lang.IllegalArgumentException: Unable to create serializer 
"com.esotericsoftware.kryo.serializers.FieldSerializer" for class: 
org.apache.logging.log4j.core.layout.AbstractCsvLayout

Serialization trace:
classes (sun.misc.Launcher$AppClassLoader)
classloader (java.security.ProtectionDomain)
cachedPDs (javax.security.auth.SubjectDomainCombiner)
combiner (java.security.AccessControlContext)
acc (sun.security.ssl.SSLSocketImpl)
connection (org.postgresql.core.PGStream)
pgStream (org.postgresql.core.v3.QueryExecutorImpl)
transferModeRegistry (org.postgresql.core.v3.SimpleQuery)
commitQuery (org.postgresql.jdbc.PgConnection)
connection (org.postgresql.jdbc.PgResultSet)
val$rs 
(sensingfeeling.functions.mapping.FrameMotionPathsToTelemetryJoinFunction$4)
at 
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)

at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
at 
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)

at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
at 
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:88)
at 
com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:21)

at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
at 
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)

at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
at 
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
at 

Re: CSV StreamingFileSink

2020-02-19 Thread Timo Walther

Hi Austin,

the StreamingFileSink allows bucketing the output data.

This should help for your use case:

https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html#bucket-assignment

Regards,
Timo


On 19.02.20 01:00, Austin Cawley-Edwards wrote:
Following up on this -- does anyone know if it's possible to stream 
individual files to a directory using the StreamingFileSink? For 
instance, if I want all records that come in during a certain day to be 
partitioned into daily directories:


2020-02-18/
    large-file-1.txt
    large-file-2.txt
2020-02-19/
    large-file-3.txt

Or is there another way to accomplish this?

Thanks!
Austin

On Tue, Feb 18, 2020 at 5:33 PM Austin Cawley-Edwards 
mailto:austin.caw...@gmail.com>> wrote:


Hey all,

Has anyone had success using the StreamingFileSink[1] to write CSV
files? And if so, what about compressed (Gzipped, ideally) files/
which libraries did you use?


Best,
Austin


[1]:

https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html





Re: Parallelize Kafka Deserialization of a single partition?

2020-02-19 Thread Timo Walther

Hi Theo,

there are lot of performance improvements that Flink could do but they 
would further complicate the interfaces and API. They would require deep 
knowledge of users about the runtime when it is safe to reuse object and 
when not.


The Table/SQL API of Flink uses a lot of these optimization under the 
hood and works on binary data for reducing garbage collection.


For the DataStream API, the community decided for safety/correctness 
before performance in this case. But disabling the object reuse and 
further low level optimization should give a good result if needed.


Regards,
Timo


On 19.02.20 10:43, Theo Diefenthal wrote:

I have the same experience as Eleanore,

When enabling object reuse, I saw a significant performance improvement 
and in my profiling session. I saw that a lot of 
serialization/deserialization was not performed any more.


That’s why my question should originally aim a bit further: It’s good 
that Flink reuses objects, but I still need to create a new instance of 
my objects per event when parsed, which is ultimately dropped at some 
processing step in the flink pipeline later on (map, shuffle or sink). 
Wouldn’t it be possible that the “deserialize” method can have an 
optional “oldPOJO” input where Flink provides me an unused old instance 
of my POJO if it has one left? And if null, I instantiate a new instance 
in my code? With billions of small events ingested per day, I can 
imagine this to be another small performance improvement especially in 
terms of garbage collection…


Best regads

Theo

*From:*Till Rohrmann 
*Sent:* Mittwoch, 19. Februar 2020 07:34
*To:* Jin Yi 
*Cc:* user 
*Subject:* Re: Parallelize Kafka Deserialization of a single partition?

Then my statement must be wrong. Let me double check this. Yesterday 
when checking the usage of the objectReuse field, I could only see it in 
the batch operators. I'll get back to you.


Cheers,

Till

On Wed, Feb 19, 2020, 07:05 Jin Yi > wrote:


Hi Till,

I just read your comment:

Currently, enabling object reuse via
ExecutionConfig.enableObjectReuse() only affects the DataSet API.
DataStream programs will always do defensive copies. There is a FLIP
to improve this behaviour [1].

I have an application that is written in apache beam, but the runner
is flink, in the configuration of the pipeline, it is in streaming
mode, and I see performance difference between enable/disable
ObjectReuse, also when running in debugging mode, I noticed that
with objectReuse set to true, there is no
serialization/deserialization happening between operators, however,
when set to false, in between each operator, the serialization and
deserialization is happening. So do you have any idea why this is
happening?

MyOptions options = PipelineOptionsFactory./as/(MyOptions.*class*);

options.setStreaming(*true*);

options.setObjectReuse(*true*);

Thanks a lot!

Eleanore

On Tue, Feb 18, 2020 at 6:05 AM Till Rohrmann mailto:trohrm...@apache.org>> wrote:

Hi Theo,

the KafkaDeserializationSchema does not allow to return
asynchronous results. Hence, Flink will always wait until
KafkaDeserializationSchema.deserialize returns the parsed value.
Consequently, the only way I can think of to offload the complex
parsing logic would be to do it in a downstream operator where
you could use AsyncI/O to run the parsing logic in a thread
pool, for example.

Alternatively, you could think about a simple program which
transforms your input events into another format where it is
easier to extract the timestamp from. This comes, however, at
the cost of another Kafka topic.

Currently, enabling object reuse via
ExecutionConfig.enableObjectReuse() only affects the DataSet
API. DataStream programs will always do defensive copies. There
is a FLIP to improve this behaviour [1].

[1]

https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=71012982

Cheers,

Till

On Mon, Feb 17, 2020 at 1:14 PM Theo Diefenthal
mailto:theo.diefent...@scoop-software.de>> wrote:

Hi,

As for most pipelines, our flink pipeline start with parsing
source kafka events into POJOs. We perform this step within
a KafkaDeserizationSchema so that we properly extract the
event itme timestamp for the downstream Timestamp-Assigner.

Now it turned out that parsing is currently the most CPU
intensive task in our pipeline and thus CPU bounds the
number of elements we can ingest per second. Further
splitting up the partitions will be hard as we need to
maintain the exact order of events per partition and would
also required quite some architectural changes for producers
 

Re: BucketingSink capabilities for DataSet API

2020-02-19 Thread Timo Walther

Hi Anuj,

another option would be to use the new Hive connectors. Have you looked 
into those? They might work on SQL internal data types which is why you 
would need to use the Table API then.


Maybe Bowen in CC can help you here.

Regards,
Timo

On 19.02.20 11:14, Rafi Aroch wrote:

Hi Anuj,

It's been a while since I wrote this (Flink 1.5.2). Could be a 
better/newer way, but this is what how I read & write Parquet with 
hadoop-compatibility:


// imports
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat;

import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat; 


import org.apache.flink.hadoopcompatibility.HadoopInputs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.parquet.avro.AvroParquetInputFormat;

// Creating Parquet input format
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
AvroParquetInputFormat parquetInputFormat = new
AvroParquetInputFormat<>();
AvroParquetInputFormat.setInputDirRecursive(job, true);
AvroParquetInputFormat.setInputPaths(job, pathsToProcess);
HadoopInputFormat inputFormat
= HadoopInputs.createHadoopInput(parquetInputFormat, Void.class,
GenericRecord.class, job);

// Creating Parquet output format
AvroParquetOutputFormat parquetOutputFormat = new
AvroParquetOutputFormat<>();
AvroParquetOutputFormat.setSchema(job, new
Schema.Parser().parse(SomeEvent.SCHEMA));
AvroParquetOutputFormat.setCompression(job,
CompressionCodecName.SNAPPY);
AvroParquetOutputFormat.setCompressOutput(job, true);
AvroParquetOutputFormat.setOutputPath(job, new Path(pathString));
HadoopOutputFormat outputFormat = new
HadoopOutputFormat<>(parquetOutputFormat, job); 


DataSource> inputFileSource =
env.createInput(inputFormat); 

// Start processing... 


// Writing result as Parquet
resultDataSet.output(outputFormat);


Regarding writing partitioned data, as far as I know, there is no way to 
achieve that with the DataSet API with hadoop-compatibility.


You could implement this with reading from input files as stream and 
then using StreamingFileSink with a custom BucketAssigner [1].
The problem with that (which was not yet resolved AFAIK) is described 
here [2] in "Important Notice 2".


Sadly I say, that eventually, for this use-case I chose Spark to do the 
job...


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html

[2]https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html#general

Hope this helps.

Rafi


On Sat, Feb 15, 2020 at 5:03 PM aj > wrote:


Hi Rafi,

I have a similar use case where I want to read parquet files in the
dataset and want to perform some transformation and similarly want
to write the result using year month day partitioned.

I am stuck at first step only where how to read and write
Parquet files using hadoop-Compatability.

Please help me with this and also if u find the solution for how to
write data in partitioned.

Thanks,
Anuj

On Thu, Oct 25, 2018 at 5:35 PM Andrey Zagrebin
mailto:and...@data-artisans.com>> wrote:

Hi Rafi,

At the moment I do not see any support of Parquet in DataSet API
except HadoopOutputFormat, mentioned in stack overflow question.
I have cc’ed Fabian and Aljoscha, maybe they could provide more
information.

Best,
Andrey


On 25 Oct 2018, at 13:08, Rafi Aroch mailto:rafi.ar...@gmail.com>> wrote:

Hi,

I'm writing a Batch job which reads Parquet, does some
aggregations and writes back as Parquet files.
I would like the output to be partitioned by year, month, day
by event time. Similarly to the functionality of the
BucketingSink.

I was able to achieve the reading/writing to/from Parquet by
using the hadoop-compatibility features.
I couldn't find a way to partition the data by year, month,
day to create a folder hierarchy accordingly. Everything is
written to a single directory.

I could find an unanswered question about this issue:

https://stackoverflow.com/questions/52204034/apache-flink-does-dataset-api-support-writing-output-to-individual-file-partit

Can anyone suggest a way to achieve this? Maybe there's a way
to integrate the BucketingSink with the DataSet API? Another
solution?

Rafi




-- 
Thanks & Regards,

Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07









Re: BucketingSink capabilities for DataSet API

2020-02-19 Thread Rafi Aroch
Hi Anuj,

It's been a while since I wrote this (Flink 1.5.2). Could be a better/newer
way, but this is what how I read & write Parquet with hadoop-compatibility:

// imports
> import org.apache.avro.generic.GenericRecord;
> import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat;
>
import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;

import org.apache.flink.hadoopcompatibility.HadoopInputs;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.Path;
> import org.apache.hadoop.mapreduce.Job;
> import org.apache.parquet.avro.AvroParquetInputFormat;
>
> // Creating Parquet input format
> Configuration conf = new Configuration();
> Job job = Job.getInstance(conf);
> AvroParquetInputFormat parquetInputFormat = new
> AvroParquetInputFormat<>();
> AvroParquetInputFormat.setInputDirRecursive(job, true);
> AvroParquetInputFormat.setInputPaths(job, pathsToProcess);
> HadoopInputFormat inputFormat
> = HadoopInputs.createHadoopInput(parquetInputFormat, Void.class,
> GenericRecord.class, job);
>


> // Creating Parquet output format
> AvroParquetOutputFormat parquetOutputFormat = new
> AvroParquetOutputFormat<>();
> AvroParquetOutputFormat.setSchema(job, new
> Schema.Parser().parse(SomeEvent.SCHEMA));
> AvroParquetOutputFormat.setCompression(job, CompressionCodecName.SNAPPY);
> AvroParquetOutputFormat.setCompressOutput(job, true);
> AvroParquetOutputFormat.setOutputPath(job, new Path(pathString));
> HadoopOutputFormat outputFormat = new
> HadoopOutputFormat<>(parquetOutputFormat, job);



DataSource> inputFileSource =
> env.createInput(inputFormat);



// Start processing...



// Writing result as Parquet
> resultDataSet.output(outputFormat);


Regarding writing partitioned data, as far as I know, there is no way to
achieve that with the DataSet API with hadoop-compatibility.

You could implement this with reading from input files as stream and then
using StreamingFileSink with a custom BucketAssigner [1].
The problem with that (which was not yet resolved AFAIK) is described here
[2] in "Important Notice 2".

Sadly I say, that eventually, for this use-case I chose Spark to do the
job...

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html#general

Hope this helps.

Rafi


On Sat, Feb 15, 2020 at 5:03 PM aj  wrote:

> Hi Rafi,
>
> I have a similar use case where I want to read parquet files in the
> dataset and want to perform some transformation and similarly want to write
> the result using year month day partitioned.
>
> I am stuck at first step only where how to read and write Parquet files
> using hadoop-Compatability.
>
> Please help me with this and also if u find the solution for how to write
> data in partitioned.
>
> Thanks,
> Anuj
>
>
> On Thu, Oct 25, 2018 at 5:35 PM Andrey Zagrebin 
> wrote:
>
>> Hi Rafi,
>>
>> At the moment I do not see any support of Parquet in DataSet API
>> except HadoopOutputFormat, mentioned in stack overflow question. I have
>> cc’ed Fabian and Aljoscha, maybe they could provide more information.
>>
>> Best,
>> Andrey
>>
>> On 25 Oct 2018, at 13:08, Rafi Aroch  wrote:
>>
>> Hi,
>>
>> I'm writing a Batch job which reads Parquet, does some aggregations and
>> writes back as Parquet files.
>> I would like the output to be partitioned by year, month, day by event
>> time. Similarly to the functionality of the BucketingSink.
>>
>> I was able to achieve the reading/writing to/from Parquet by using the
>> hadoop-compatibility features.
>> I couldn't find a way to partition the data by year, month, day to create
>> a folder hierarchy accordingly. Everything is written to a single directory.
>>
>> I could find an unanswered question about this issue:
>> https://stackoverflow.com/questions/52204034/apache-flink-does-dataset-api-support-writing-output-to-individual-file-partit
>>
>> Can anyone suggest a way to achieve this? Maybe there's a way to
>> integrate the BucketingSink with the DataSet API? Another solution?
>>
>> Rafi
>>
>>
>>
>
> --
> Thanks & Regards,
> Anuj Jain
> Mob. : +91- 8588817877
> Skype : anuj.jain07
> 
>
>
> 
>


Re: Exceptions in Web UI do not appear in logs

2020-02-19 Thread Robert Metzger
Thanks a lot for reporting this issue.
Which version of Flink are you using?

I checked the code of the Kinesis ShardConsumer (the current version
though), and I found that exceptions from the ShardConsumer are properly
forwarded to the lower level runtime.

Did you check the *.out files of the TaskManager where the exception
occurred as well?
If you find the exception somewhere else, I'd be curious to see a longer
stack trace (if there's one)


On Thu, Feb 6, 2020 at 5:59 PM Ori Popowski  wrote:

> In the "Exceptions" tab in the Web UI, we see exceptions that for some
> reason do not appear in the logs.
>
> Specifically the exception I'm talking about is a Runtime Exception which
> is not logged by the code, so it's essentially uncaught exception that is
> propagated to the Task Manager.
>
> We use SLF4J with Logback, and all our appenders and log levels are
> configured correctly. Some exceptions do appear in the logs though.
>
> This is the exception:
>
> java.lang.RuntimeException: Rate Exceeded for getRecords operation - all 3 
> retry attempts returned ProvisionedThroughputExceededException.
>   at 
> org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:234)
>   at 
> org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.getRecords(ShardConsumer.java:311)
>   at 
> org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:219)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
>
>
> Does someone know how to make all the exceptions in the Web UI also appear
> in the logs?
>
> Thanks
>
>
>


RE: Parallelize Kafka Deserialization of a single partition?

2020-02-19 Thread Theo Diefenthal
I have the same experience as Eleanore,



When enabling object reuse, I saw a significant performance improvement and 
in my profiling session. I saw that a lot of serialization/deserialization 
was not performed any more.



That’s why my question should originally aim a bit further: It’s good that 
Flink reuses objects, but I still need to create a new instance of my 
objects per event when parsed, which is ultimately dropped at some 
processing step in the flink pipeline later on (map, shuffle or sink). 
Wouldn’t it be possible that the “deserialize” method can have an optional 
“oldPOJO” input where Flink provides me an unused old instance of my POJO if 
it has one left? And if null, I instantiate a new instance in my code? With 
billions of small events ingested per day, I can imagine this to be another 
small performance improvement especially in terms of garbage collection…



Best regads

Theo



From: Till Rohrmann 
Sent: Mittwoch, 19. Februar 2020 07:34
To: Jin Yi 
Cc: user 
Subject: Re: Parallelize Kafka Deserialization of a single partition?



Then my statement must be wrong. Let me double check this. Yesterday when 
checking the usage of the objectReuse field, I could only see it in the 
batch operators. I'll get back to you.



Cheers,

Till



On Wed, Feb 19, 2020, 07:05 Jin Yi mailto:eleanore@gmail.com> > wrote:

Hi Till,

I just read your comment:

Currently, enabling object reuse via ExecutionConfig.enableObjectReuse() 
only affects the DataSet API. DataStream programs will always do defensive 
copies. There is a FLIP to improve this behaviour [1].



I have an application that is written in apache beam, but the runner is 
flink, in the configuration of the pipeline, it is in streaming mode, and I 
see performance difference between enable/disable ObjectReuse, also when 
running in debugging mode, I noticed that with objectReuse set to true, 
there is no serialization/deserialization happening between operators, 
however, when set to false, in between each operator, the serialization and 
deserialization is happening. So do you have any idea why this is happening?

MyOptions options = PipelineOptionsFactory.as(MyOptions.class);
options.setStreaming(true);
options.setObjectReuse(true);

Thanks a lot!
Eleanore



On Tue, Feb 18, 2020 at 6:05 AM Till Rohrmann mailto:trohrm...@apache.org> > wrote:

Hi Theo,



the KafkaDeserializationSchema does not allow to return asynchronous 
results. Hence, Flink will always wait until 
KafkaDeserializationSchema.deserialize returns the parsed value. 
Consequently, the only way I can think of to offload the complex parsing 
logic would be to do it in a downstream operator where you could use 
AsyncI/O to run the parsing logic in a thread pool, for example.



Alternatively, you could think about a simple program which transforms your 
input events into another format where it is easier to extract the timestamp 
from. This comes, however, at the cost of another Kafka topic.



Currently, enabling object reuse via ExecutionConfig.enableObjectReuse() 
only affects the DataSet API. DataStream programs will always do defensive 
copies. There is a FLIP to improve this behaviour [1].



[1] 
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=71012982



Cheers,

Till



On Mon, Feb 17, 2020 at 1:14 PM Theo Diefenthal 
mailto:theo.diefent...@scoop-software.de> > wrote:

Hi,



As for most pipelines, our flink pipeline start with parsing source kafka 
events into POJOs. We perform this step within a KafkaDeserizationSchema so 
that we properly extract the event itme timestamp for the downstream 
Timestamp-Assigner.



Now it turned out that parsing is currently the most CPU intensive task in 
our pipeline and thus CPU bounds the number of elements we can ingest per 
second. Further splitting up the partitions will be hard as we need to 
maintain the exact order of events per partition and would also required 
quite some architectural changes for producers and the flink job.



Now I had the idea to put the parsing task into ordered Async-IO. But 
AsyncIO can only be plugged in into an existing Stream, not into the 
deserialization schema, as far as I see. So the best idea I currently have 
is to keep parsing in the DeserializationSchema as minimal as possible to 
extract the Event timestamp and do the full parsing downstream in Async IO. 
This however, seems to be a bit tedious, especially as we have to deal with 
multiple input formats and would need to develop two parsers for the heavy 
load once: a timestamp only and a full parser.



Do you know if it is somehow possible to parallelize / async IO the parsing 
within the KafkaDeserializationSchema? I don't have state access in there 
and I don't have a "collector" object in there so that one element as input 
needs to produce exactly one output element.



Another question: My parsing produces Java POJO objects via "new", which are 
sent downstream (reusePOJO setting set) and finally will