Unable to read S3 data using the filesystem connector

2021-12-28 Thread Rohan Kumar
Hello,

I tried to read parquet data in S3 using the filesystem connector but got
the below error. The jobmanger is not starting.
I tried the standalone-job in docker.
I have already included flink-s3-fs-hadoop and flink-s3-fs-presto as
plugins and they are working fine for checkpointing and Kubernetes HA. The
issue is when I am  reading files from S3 using the table API connector.

Caused by: java.lang.ClassNotFoundException:
org.apache.hadoop.conf.Configuration
at jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown Source) ~[?:?]
at jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(Unknown
Source) ~[?:?]
at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
at
org.apache.flink.formats.parquet.ParquetFileFormatFactory.getParquetConfiguration(ParquetFileFormatFactory.java:116)
~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
at
org.apache.flink.formats.parquet.ParquetFileFormatFactory.access$000(ParquetFileFormatFactory.java:51)
~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
at
org.apache.flink.formats.parquet.ParquetFileFormatFactory$1.createRuntimeDecoder(ParquetFileFormatFactory.java:79)
~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
at
org.apache.flink.formats.parquet.ParquetFileFormatFactory$1.createRuntimeDecoder(ParquetFileFormatFactory.java:67)
~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
at
org.apache.flink.table.filesystem.FileSystemTableSource.getScanRuntimeProvider(FileSystemTableSource.java:118)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.table.planner.connectors.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:452)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.table.planner.connectors.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:160)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.table.planner.connectors.DynamicSourceUtils.convertSourceToRel(DynamicSourceUtils.java:124)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:85)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
org.apache.calcite.rel.core.RelFactories$TableScanFactoryImpl.createScan(RelFactories.java:495)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1099)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1123)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:351)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:154)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:68)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:151)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:133)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:92)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:68)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.table.planner.calcite.FlinkRelBuilder.queryOperation(FlinkRelBuilder.scala:184)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:258)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:182)
~[flink-table_2.12-1.14.2.jar:1.14.2]
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285)
~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
at scala.collection.Iterator.foreach(Iterator.scala:943)
~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
at scala.collection.Iterator.foreach$(Iterator.scala:943)
~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
at scala.collection.TraversableLike.map(TraversableLike.scala:285)
~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT]
at scala.collection.TraversableLike.map$(TraversableLike.scala:278)
~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-

Re: Alternatives of KafkaDeserializationSchema.isEndOfStream()

2021-12-28 Thread Dong Lin
Hi Arvid,

After discussing with Jiangjie offline, I agree using Collector::close() is
not appropriate because we in general prefer close() to be called by one
entity, in this case the Flink runtime. Having close() called by both user
and Flink runtime could be error-prone, even though we can make it work
with some extra work.

I am now thinking about adding a public class (e.g. SourceCollector) that
extends the existing Collector. SourceCollector::endOfStream() could be
invoked by users to signal EOF. And users could optionally implement a
KafkaRecordDeserializationSchema::deserialize(SourceCollector) if they want
to do dynamic EOF.

To make our discussion more efficient and possibly involve more people for
comments, I will create a FLIP and open a discussion thread for the FLIP.

Thanks,
Dong



On Tue, Dec 28, 2021 at 7:40 PM Dong Lin  wrote:

> Hi Arvid,
>
> Thanks a lot for the detailed reply.
>
> Just to clarify, I don't plan to ask user to implement
> KafkaRecordDeserializationSchema#isEndOfStream(T). My current idea is to
> not add any public API, but expect users to re-use the
> existing Collector::close() API inside
> KafkaRecordDeserializationSchema::deserialize(...). And if a message with
> the user-specified pattern has arrived, the user can invoke
> Collector::close() which signals Flink to stop reading from the
> corresponding source split.
>
> Here are a few clarifications in response to the discussion:
>
> 1) The boundedness of the source and execution.runtime-mode would not be
> affected by this proposal. Users can keep using the existing setting
> without or without the dynamic EOF.
> 2) The dynamic EOF works independently of the
> stop stoppingOffsetsInitializer. When reading from Kafka Source, users can
> rely on just the dynamic EOF without specifying stoppingOffsetsInitializer.
> And if users specify both dynamic EOF and stoppingOffsetsInitializer, the
> job stops reading from the source split when either condition is met.
> 3) Suppose users can specify the dynamic EOF in
> KafkaRecordDeserializationSchema::deserialize(...), then users have access
> to the entire ConsumerRecord. This approach could address Ayush's use-case.
> 4) Suppose we choose to do it in
> KafkaRecordDeserializationSchema::deserialize(...), then the dynamic EOF
> happens inside the RecordEmitter. Yes we will need to be able to close the
> split.
> 5) For the majority of users who do not want dynamic EOF, those users can
> keep using the existing out-of-the-box support for Avro/Json/Protobuf.
> For advanced users who want dynamic EOF, those users anyway need to encode
> the dynamic EOF logic in a method similar to
> KafkaRecordDeserializationSchema (with access to the raw message). Adding
> the dynamic EOF support would not make their life harder.
>
> Based on the discussion so far, it looks like there are two approaches
> mentioned so far:
>
> 1) Let users call Collector::close() API inside
> KafkaRecordDeserializationSchema::deserialize(...) to signal the EOF.
>
> 2) Add the API KafkaSourceBuilder::setBoundedStopCursor(StopCursor), where
> StopCursor subsumes all existing functionalities of
> the stoppingOffsetsInitializer. And StopCursor::shouldStop needs to take
> both the raw and the deserialized message.
>
> It seems that the second approach involves much more API change than the
> first work (including deprecation of some existing APIs).
>
> Regarding the first approach, could you help explain why "close is the
> completely wrong method for that"? My understanding is the close() method
> indicates that the caller no longer needs to read from this source split
> and the associated network resource could be released. Why is it wrong for
> a user to call this method?
>
>
> On Tue, Dec 28, 2021 at 5:15 PM Arvid Heise  wrote:
>
>> Hi Dong,
>>
>> Could you help explain why we can not dynamically stop reading from a
>>> source in batch mode?
>>>
>> We can but we cannot easily determine if the source is supposed to run in
>> batch or streaming. A user would need to implement a special
>> KafkaRecordDeserializationSchema and still provide an OffsetInitializer for
>> the end offset to trigger batch mode.
>>
>> How are both concepts supposed to interact? Are we only stopping if any
>> of the concept state that this is the end?
>>
>> We could ofc offer some KafkaSourceBuilde#setBounded() without parameters
>> so that a user can implement a special KafkaRecordDeserializationSchema and
>> notify the builder but this looks awkward to me and is quite error-prone:
>> When a user uses setBounded without overwriting isEndOfStream, the
>> application would never emit anything.
>>
>> My understanding is that when a message with the particular pattern
>>> (specified by the user) is encountered, we can have the source operator
>>> emit the high-watermark in such a way as if the particular partition of
>>> this source has reached EOF. And this must have worked since users have
>>> been using KafkaDeserializationSchema::isEndOfStre

Error while deploying from snapshot after adding new column in existing table

2021-12-28 Thread shamit jain
Hello Experts,


I need help to understand whether we can deploy a job from a snapshot after
changing the code by adding one new column in an existing table.

We are using flink-1.13.2 table API and RocksDB as backend. We have changed
the code and added one new column in the existing table and later, tried to
deploy from the snapshot. While deploying, I'm getting the below error:-


Caused by: org.apache.flink.util.StateMigrationException: The new state
serializer
(org.apache.flink.table.runtime.typeutils.RowDataSerializer@957c5336) must
not be incompatible with the old state serializer
(org.apache.flink.table.runtime.typeutils.RowDataSerializer@cfe79aaf).
   at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:704)
   at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:624)
   at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837)
   at
org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
   at
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71)
   at
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:301)
   at
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:352)
   at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115)
   at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
   ...


After troubleshooting, I found we are getting this error while comparing
the fields of previous and latest table definition.

This comparision is happening from flink-table-runtime library  class:
org.apache.flink.table.runtime.typeutils.RowDataSerializer -> method name:
resolveSchemaCompatibility()

   if (!Arrays.equals(previousTypes, newRowSerializer.types)) {
return TypeSerializerSchemaCompatibility.incompatible();
}


Can you please help me to understand if we can add a new column in an
existing table and deploy from the snapshot?

Regards,
Shamit Jain


TypeInformation | Flink

2021-12-28 Thread Siddhesh Kalgaonkar
Hi Team,

I am a newbie to Flink and Scala and trying my best to learn everything I
can. I doing a practice where  I am getting incoming JSON data from the
Kafka topic and want to perform a data type check on it.
For that, I came across TypeInformation of Flink. Please read my problem in
detail from the below link:

Flink Problem


I went through the documentation but didn't come across any relevant
examples. Any suggestions would help.

Looking forward to hearing from you.


Thanks,
Siddhesh


RE: Re: Read parquet data from S3 with Flink 1.12

2021-12-28 Thread Rohan Kumar
Hi Alexandre, I am also facing the same issue. Please let us know if you
are able to find anything.

Thanks

On 2021/12/27 02:11:01 Alexandre Montecucco wrote:
> Hi Seth,
> Thank you for confirming the issue due to the transition in 1.14.
> For now, given my constraints, I will do a simple workaround and download
> the whole dataset with java aws library.
>
> For future reference though I would like to solve this
> I am actually still on 1.12 at the moment and had actually some issue with
> simply using flink-parquet.
> I think I would have the same issue with 1.14. The root issue is really
> around Hadoop library.
>
> If I simply add `flink-parquet` library as specified in the doc it cannot
> compile because of class not found for
> `org.apache.hadoop.conf.Configuration`.
> If I add `hadoop-common` and mark it as provided, it fails with class not
> found at runtime.
> If I bundle hadoop with my application jar, the it crashes with filesystem
> not found for `s3`.
>
> Did I miss anything in the doc?
>
> Alex
>
> On Tue, Dec 21, 2021 at 10:29 PM Seth Wiesman  wrote:
>
> > Hi Alexandre,
> >
> > You are correct, BatchTableEnvironment does not exist in 1.14 anymore.
In
> > 1.15 we will have the state processor API ported to DataStream for
exactly
> > this reason, it is the last piece to begin officially marking DataSet as
> > deprecated. As you can understand, this has been a multi year process
and
> > there have been some rough edges as components are migrated.
> >
> > The easiest solution is for you to use 1.12 DataSet <-> Table interop.
Any
> > savepoint you create using Flink 1.12 you should be able to restore on a
> > 1.14 DataStream application.
> >
> > I am unsure of the issue with the Hadoop plugin, but if using 1.14 is a
> > hard requirement, rewriting your input data into another format could
also
> > be a viable stop-gap solution.
> >
> > Seth
> >
> > On Mon, Dec 20, 2021 at 8:57 PM Alexandre Montecucco <
> > alexandre.montecu...@grabtaxi.com> wrote:
> >
> >> Hello,
> >>
> >> I also face the same issue as documented in a previous mail from the
> >> mailing list [1]
> >> Basically when using flink-parquet, I get:
> >>
> >>>  java.lang.ClassNotFoundException:
org.apache.hadoop.conf.Configuration
> >>
> >> I have no idea what I need to do to fix this and could not find
anything
> >> from the doc. I tried importing various hadoop libraries, but it always
> >> causes yet another issue.
> >>
> >> I think this might be the root cause of my problem.
> >>
> >> Best,
> >> Alex
> >>
> >> [1] https://lists.apache.org/thread/796m8tww4gqykqm1szb3y5m7t6scgho2
> >>
> >> On Mon, Dec 20, 2021 at 4:23 PM Alexandre Montecucco <
> >> alexandre.montecu...@grabtaxi.com> wrote:
> >>
> >>> Hello Piotrek,
> >>> Thank you for the help.
> >>> Regarding the S3 issue I have followed the documentation for the
> >>> plugins. Many of our other apps are using S3 through the Hadoop Fs
Flink
> >>> plugin.
> >>> Also, in this case, just reading regular plain text file works, I only
> >>> have an issue when using Parquet.
> >>>
> >>> I tried switching to Flink 1.14, however I am stumbling upon other
> >>> blockers.
> >>> To give more context, I am trying to build a Flink savepoint for cold
> >>> start data. So I am using the Flink State Processor API. But:
> >>>  -  Flink State Processor API is using the DataSet api which is now
> >>> marked as deprecated (Legacy)
> >>>  - the doc you shared regarding reading from Parquet uses the
DataStream
> >>> API
> >>>  - the Flink State Processor API doc [1] states there is
interoperability
> >>> of DataSet and Table API
> >>> <
https://nightlies.apache.org/flink/flink-docs-master/dev/table/common.html#integration-with-datastream-and-dataset-api
>
> >>>  (but the link is now erroneous), it was last correct in Flink 1.12
[2]
> >>>
> >>> Given that we can convert from DataStream to Table API, I was
thinking I
> >>> could then convert from Table to DataSet API (though very cumbersome
and
> >>> unsure if any performance / memory impact).
> >>> But for the Table to DataSet conversion, the doc is using a
BatchTableEnvironment
> >>> class which does not seem to exist in Flink 1.14 anymore
> >>>
> >>> Any recommendations or anything I might have missed?
> >>>
> >>> Thank you.
> >>>
> >>> Best,
> >>> Alex
> >>>
> >>>
> >>> [1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/libs/state_processor_api/#state-processor-api
> >>>
> >>> <
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/libs/state_processor_api/#state-processor-api
>
> >>>
> >>> [2]
> >>>
https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/table/common.html#integration-with-datastream-and-dataset-api
> >>> [3]
> >>>
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/formats/parquet/
> >>>
> >>>
> >>> On Fri, Dec 17, 2021 at 8:53 PM Piotr Nowojski 
> >>> wrote:
> >>>
>  Hi,
> 
>  Reading in the DataStream API (that's what I'm using you are doing)
>  from 

Re: Alternatives of KafkaDeserializationSchema.isEndOfStream()

2021-12-28 Thread Dong Lin
Hi Arvid,

Thanks a lot for the detailed reply.

Just to clarify, I don't plan to ask user to implement
KafkaRecordDeserializationSchema#isEndOfStream(T). My current idea is to
not add any public API, but expect users to re-use the
existing Collector::close() API inside
KafkaRecordDeserializationSchema::deserialize(...). And if a message with
the user-specified pattern has arrived, the user can invoke
Collector::close() which signals Flink to stop reading from the
corresponding source split.

Here are a few clarifications in response to the discussion:

1) The boundedness of the source and execution.runtime-mode would not be
affected by this proposal. Users can keep using the existing setting
without or without the dynamic EOF.
2) The dynamic EOF works independently of the
stop stoppingOffsetsInitializer. When reading from Kafka Source, users can
rely on just the dynamic EOF without specifying stoppingOffsetsInitializer.
And if users specify both dynamic EOF and stoppingOffsetsInitializer, the
job stops reading from the source split when either condition is met.
3) Suppose users can specify the dynamic EOF in
KafkaRecordDeserializationSchema::deserialize(...), then users have access
to the entire ConsumerRecord. This approach could address Ayush's use-case.
4) Suppose we choose to do it in
KafkaRecordDeserializationSchema::deserialize(...), then the dynamic EOF
happens inside the RecordEmitter. Yes we will need to be able to close the
split.
5) For the majority of users who do not want dynamic EOF, those users can
keep using the existing out-of-the-box support for Avro/Json/Protobuf. For
advanced users who want dynamic EOF, those users anyway need to encode the
dynamic EOF logic in a method similar to  KafkaRecordDeserializationSchema
(with access to the raw message). Adding the dynamic EOF support would not
make their life harder.

Based on the discussion so far, it looks like there are two approaches
mentioned so far:

1) Let users call Collector::close() API inside
KafkaRecordDeserializationSchema::deserialize(...) to signal the EOF.

2) Add the API KafkaSourceBuilder::setBoundedStopCursor(StopCursor), where
StopCursor subsumes all existing functionalities of
the stoppingOffsetsInitializer. And StopCursor::shouldStop needs to take
both the raw and the deserialized message.

It seems that the second approach involves much more API change than the
first work (including deprecation of some existing APIs).

Regarding the first approach, could you help explain why "close is the
completely wrong method for that"? My understanding is the close() method
indicates that the caller no longer needs to read from this source split
and the associated network resource could be released. Why is it wrong for
a user to call this method?


On Tue, Dec 28, 2021 at 5:15 PM Arvid Heise  wrote:

> Hi Dong,
>
> Could you help explain why we can not dynamically stop reading from a
>> source in batch mode?
>>
> We can but we cannot easily determine if the source is supposed to run in
> batch or streaming. A user would need to implement a special
> KafkaRecordDeserializationSchema and still provide an OffsetInitializer for
> the end offset to trigger batch mode.
>
> How are both concepts supposed to interact? Are we only stopping if any of
> the concept state that this is the end?
>
> We could ofc offer some KafkaSourceBuilde#setBounded() without parameters
> so that a user can implement a special KafkaRecordDeserializationSchema and
> notify the builder but this looks awkward to me and is quite error-prone:
> When a user uses setBounded without overwriting isEndOfStream, the
> application would never emit anything.
>
> My understanding is that when a message with the particular pattern
>> (specified by the user) is encountered, we can have the source operator
>> emit the high-watermark in such a way as if the particular partition of
>> this source has reached EOF. And this must have worked since users have
>> been using KafkaDeserializationSchema::isEndOfStream with the
>> legacy FlinkKafkaConsumer. Did I miss something here?
>>
> Yes batch mode is different from bounded streaming. [1] We can only fully
> leverage a statically bounded source by statically defining it as such with
> the FLIP-27 Source interface. [2]
>
> Hmm.. users already need to provide a KafkaRecordDeserializationSchema
>> via KafkaSourceBuilder::setDeserializer(...) today even if we don't support
>> dynamic EOF. Do you mean that if we support dynamic EOF, then it will be
>> harder for user to implement KafkaRecordDeserializationSchema?
>>
> Users mostly use the factory methods that adapt to Flink's
> DeserializationSchema. We should also offer a builder similarly to
> KafkaRecordSerializationSchemaBuilder.
>
> Regarding "how to use it from Table/SQL", support we allow user to encode
>> this dynamic EOF logic inside KafkaRecordDeserializationSchema.
>
> I'm not sure if we can/should expose dynamic EOF in SQL but at the very
> least we should properly support en

Re: Alternatives of KafkaDeserializationSchema.isEndOfStream()

2021-12-28 Thread Arvid Heise
Hi Dong,

Could you help explain why we can not dynamically stop reading from a
> source in batch mode?
>
We can but we cannot easily determine if the source is supposed to run in
batch or streaming. A user would need to implement a special
KafkaRecordDeserializationSchema and still provide an OffsetInitializer for
the end offset to trigger batch mode.

How are both concepts supposed to interact? Are we only stopping if any of
the concept state that this is the end?

We could ofc offer some KafkaSourceBuilde#setBounded() without parameters
so that a user can implement a special KafkaRecordDeserializationSchema and
notify the builder but this looks awkward to me and is quite error-prone:
When a user uses setBounded without overwriting isEndOfStream, the
application would never emit anything.

My understanding is that when a message with the particular pattern
> (specified by the user) is encountered, we can have the source operator
> emit the high-watermark in such a way as if the particular partition of
> this source has reached EOF. And this must have worked since users have
> been using KafkaDeserializationSchema::isEndOfStream with the
> legacy FlinkKafkaConsumer. Did I miss something here?
>
Yes batch mode is different from bounded streaming. [1] We can only fully
leverage a statically bounded source by statically defining it as such with
the FLIP-27 Source interface. [2]

Hmm.. users already need to provide a KafkaRecordDeserializationSchema
> via KafkaSourceBuilder::setDeserializer(...) today even if we don't support
> dynamic EOF. Do you mean that if we support dynamic EOF, then it will be
> harder for user to implement KafkaRecordDeserializationSchema?
>
Users mostly use the factory methods that adapt to Flink's
DeserializationSchema. We should also offer a builder similarly to
KafkaRecordSerializationSchemaBuilder.

Regarding "how to use it from Table/SQL", support we allow user to encode
> this dynamic EOF logic inside KafkaRecordDeserializationSchema.

I'm not sure if we can/should expose dynamic EOF in SQL but at the very
least we should properly support end offsets (as it's now possible). We
must avoid removing the current end offsets in favor of
KafkaRecordDeserializationSchema#isEndOfStream to enable Table/SQL to use
bounded Kafka sources.

e.g. call Collector::close() if the message content matches a
> user-specified pattern
>
No, close is the completely wrong method for that. This method should have
never been exposed to the user as it will close the network resources.
However, we need a fully functional network stack for proper shutdown.

It appears that StopCursor::shouldStop(...) takes a raw Message. While user
> could implement the dynamic EOF logic in this method, I am worried that
> this approach would lead to inferior performance due to double message
> deserialization.
>
That is a fair point. In case of Ayush, however, it's the only way to
determine that the pipeline should stop (you pretty much compare if the 5.
byte in the message has changed). If you deserialize into a SpecificRecord,
then the writer schema version is lost for isEndOfStream(T deserialized).

Another concern I have for
KafkaRecordDeserializationSchema#isEndOfStream(T) is where it is supposed
to be called then. If it's in the RecordEmitter, we need to extend the
RecordEmitter to support closing the split. If it's in the SplitReader, we
probably also need double-deserialization because of FLINK-25132 (the
record needs to be deserialized in the RecordEmitter). Maybe you can encode
it in the SplitState but this sounds rather cumbersome if it needs to be
done for all sources.

The reason is that the user's logic will likely depend on the de-serialized
> message (as opposed to the raw byte in the
> org.apache.pulsar.client.api.Message.getData()). In this case, users will
> need to deserialize the message inside StopCursor::shouldStop(...) first
> and then the message would be de-serialized again by
> the PulsarDeserializationSchema, which is specified via
> the PulsarSourceBuilder::setDeserializationSchema.
>
As written before, this is not the case of the specific user. Having the
raw message makes it much easier to determine a writer schema change. I'm
sure that there are cases, where you need to look into the data though. To
avoid double-deserialization, a better way may be to pass both the raw and
the deserialized message to `shouldStop` but then we should move the stop
logic to RecordEmitter as written before.

Do you mean that you prefer to replace
> KafkaSourceBuilder::setBounded(OffsetsInitializer) with something like
> PulsarSourceBuilder::setBoundedStopCursor(StopCursor)?
>
Ideally, yes. But that needs to be backward compatible as it's a
PublicEvolving interface.

I agree it is cleaner to let PartitionOffsetsRetrieverImpl use adminClient
> only without using KafkaClient. On the other hand, it seems that there is
> no performance/correctness concern with the existing approach? Is this
> issue related to the dis

Mapstate got wrong UK when restored.

2021-12-28 Thread Joshua Fan
Hi All
My flink version is 1.11, the statebackend is rocksdb, and I want to write
a flink job to implement an adaptive window. I wrote a flink dag like below:

> DataStream entities = env.addSource(new 
> EntitySource()).setParallelism(1);
>
> entities.keyBy(DataEntity::getName).process(new 
> EntityKeyedProcessFunction()).setParallelism(p);
>
> The important code is the EntityKeyedProcessFunction,  it is attached. I
have a mapstate in it like 'private transient MapState
entityStates;'
I print the content of the mapstate when checkpoint completed, the content
is ok like below:

> 2021-12-28 16:22:05,487 INFO window.EntityKeyedProcessFunction [] -
> >the key is 164067960
> 2021-12-28 16:22:05,487 INFO window.EntityKeyedProcessFunction [] - the
> value is name = hippopotamus, value = [window.DataEntity@b9266a0, window.
> DataEntity@682fce90]
> 2021-12-28 16:22:05,488 INFO window.EntityKeyedProcessFunction [] - the
> value is name = crocodile, value = [window.DataEntity@16d20045]
> 2021-12-28 16:22:05,488 INFO window.EntityKeyedProcessFunction [] - the
> value is name = dolphin, value = [window.DataEntity@1820c75a, window.
> DataEntity@64f3b9f6]
> 2021-12-28 16:22:05,488 INFO window.EntityKeyedProcessFunction [] - the
> value is name = Dragonfly, value = [window.DataEntity@2b2ad03]
> 2021-12-28 16:22:05,488 INFO window.EntityKeyedProcessFunction [] - the
> value is name = Hedgehog, value = [window.DataEntity@65f39671, window.
> DataEntity@2df6b2bf]
> 2021-12-28 16:22:05,488 INFO window.EntityKeyedProcessFunction [] - the
> value is name = Bee, value = [window.DataEntity@13249998]
> 2021-12-28 16:22:05,488 INFO window.EntityKeyedProcessFunction [] - the
> value is name = cicada, value = [window.DataEntity@7266e125, window.
> DataEntity@167cf1ae]
> 2021-12-28 16:22:05,488 INFO window.EntityKeyedProcessFunction [] - the
> value is name = Mosquito, value = [window.DataEntity@2596aa5a, window.
> DataEntity@603c0804]
> 2021-12-28 16:22:05,488 INFO window.EntityKeyedProcessFunction [] - the
> value is name = Crane, value = [window.DataEntity@2a3192e9, window.
> DataEntity@3a65398f]
>
The key of the mapstate is a time which was coalesced to minute.

But when the job restarted from a checkpoint, the content of the mapstate
changed, actually, the key of the mapstate changed. It would show as below.

> 2021-12-28 16:15:45,379 INFO window.EntityKeyedProcessFunction [] -
> >the key is carp
> 2021-12-28 16:15:45,391 INFO window.EntityKeyedProcessFunction [] - the
> value is name = hippopotamus, value = [window.DataEntity@510d4c4b, window.
> DataEntity@7857e387]
> 2021-12-28 16:15:45,391 INFO window.EntityKeyedProcessFunction [] - the
> value is name = crocodile, value = [window.DataEntity@31366a33, window.
> DataEntity@a62074a]
> 2021-12-28 16:15:45,391 INFO window.EntityKeyedProcessFunction [] - the
> value is name = Dragonfly, value = [window.DataEntity@56db63fa, window.
> DataEntity@54befce0, window.DataEntity@4e7cf96a]
> 2021-12-28 16:15:45,391 INFO window.EntityKeyedProcessFunction [] - the
> value is name = Hedgehog, value = [window.DataEntity@7ad09313, window.
> DataEntity@592a2955]
> 2021-12-28 16:15:45,391 INFO window.EntityKeyedProcessFunction [] - the
> value is name = Mosquito, value = [window.DataEntity@48c05cae]
> 2021-12-28 16:15:45,392 INFO window.EntityKeyedProcessFunction [] - the
> value is name = duck, value = [window.DataEntity@3e9ef1a4]
> 2021-12-28 16:15:45,392 INFO window.EntityKeyedProcessFunction [] - the
> value is name = Rhinoceros, value = [window.DataEntity@25f11701, window.
> DataEntity@2334b667]
> 2021-12-28 16:15:45,392 INFO window.EntityKeyedProcessFunction [] - the
> value is name = Eagle, value = [window.DataEntity@7574eb4a]
>
It seems like the key of the restored mapstate is the key of the operator.
My minute time was gone, and it is replaced by the key of the operator.
It is so weird. Do I misuse the mapstate?
Thanks.

Yours
Josh


EntityKeyedProcessFunction.java
Description: Binary data