Re: Using key.fields in 1.12

2021-01-07 Thread Timo Walther

Hi Aeden,

we updated the connector property design in 1.11 [1]. The old 
translation layer exists for backwards compatibility and is indicated by 
`connector.type=kafka`.


However, `connector = kafka` indicates the new property design and 
`key.fields` is only available there. Please check all properties again 
when upgrading, they are mentioned here [2].


Regards,
Timo


[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-122%3A+New+Connector+Property+Keys+for+New+Factory

[2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/


On 06.01.21 18:35, Aeden Jameson wrote:

Yes, I do have that dependency. I see it in the dependency view of
intellij and directly. in the uber jar. Thanks for responding.

- Aeden

On Wed, Jan 6, 2021 at 8:06 AM Piotr Nowojski  wrote:


Hey,

have you added Kafka connector as the dependency? [1]

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/kafka.html#dependencies

Best,
Piotrek

śr., 6 sty 2021 o 04:37 Aeden Jameson  napisał(a):


I've upgraded from 1.11.1 to 1.12 in hopes of using the key.fields
feature of the Kafa SQL Connector. My current connector is configured
as ,

connector.type= 'kafka'
connector.version = 'universal'
connector.topic   = 'my-topic'
connector.properties.group.id = 'my-consumer-group'
connector.properties.bootstrap.servers = '...'
format.type = 'avro'
format.avro-schema = ''

I tried adding

key.fields = 'my_key_field'

as well as

key.format = 'avro'
key.fields = 'my_key_field'

but I get the exception

Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
Could not find a suitable table factory for
'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Reason: No factory supports all properties.

The matching candidates:
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
Unsupported property keys:
key.fields
key.format

The following factories have been considered:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
 at 
org.apache.flink.table.factories.TableFactoryService.filterBySupportedProperties(TableFactoryService.java:434)
 at 
org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:195)
 at 
org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
 at 
org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
 at 
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:46)
 ... 21 more

I have validated that the uber jar clearly contains the 1.12
dependencies. What is that magic combination of properties to get
key.fields to work? Or is it not supported with avro?

--
Thank You,
Aeden








Re: Using key.fields in 1.12

2021-01-07 Thread Aeden Jameson
Hi Timo,

Thanks for responding. You're right. So I did update the properties.
>From what I can tell the new design you're referring to uses the
KafkaDynamicTableFactory, which contains the KEY_FIELDS (key.fields)
options, instead of KafkaTableSourceSinkFactoryBase, which doesn't
support those options. Is that right? So I updated my configuration to

connector= 'kafka'
topic   = 'my-topic'
properties.group.id = 'my-consumer-group'
properties.bootstrap.servers = '...'
format = 'avro'
format.avro-schema = ''
key.fields = 'my_key_field'

However, the property format.avro-schema doesn't appear to be
supported by KafkaDynamicTableFactory. I get this exception.

Caused by: org.apache.flink.table.api.ValidationException: Unsupported
options found for connector 'kafka'.

Unsupported options:

format.avro-schema

Supported options:

connector
format
key.fields
key.fields-prefix
key.format
properties.bootstrap.servers
properties.group.id
property-version
scan.startup.mode
scan.startup.specific-offsets
scan.startup.timestamp-millis
scan.topic-partition-discovery.interval
sink.parallelism
sink.partitioner
sink.semantic
topic
topic-pattern
value.fields-include
value.format
at 
org.apache.flink.table.factories.FactoryUtil.validateUnconsumedKeys(FactoryUtil.java:324)
at 
org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validate(FactoryUtil.java:554)
at 
org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validateExcept(FactoryUtil.java:573)
at 
org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory.createDynamicTableSource(KafkaDynamicTableFactory.java:141)
at 
org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:122)
... 21 more

FAILURE: Build failed with an exception.




The format.avro-schema property was supported it what looks to me the
old design in in KafkaTableSourceSinkFactoryBase with this line,

properties.add(FORMAT + ".*");


https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java#L160

Does format.avro-schema need to be specified differently?

Thank you,
Aeden

On Thu, Jan 7, 2021 at 12:15 AM Timo Walther  wrote:
>
> Hi Aeden,
>
> we updated the connector property design in 1.11 [1]. The old
> translation layer exists for backwards compatibility and is indicated by
> `connector.type=kafka`.
>
> However, `connector = kafka` indicates the new property design and
> `key.fields` is only available there. Please check all properties again
> when upgrading, they are mentioned here [2].
>
> Regards,
> Timo
>
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-122%3A+New+Connector+Property+Keys+for+New+Factory
> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/
>
>
> On 06.01.21 18:35, Aeden Jameson wrote:
> > Yes, I do have that dependency. I see it in the dependency view of
> > intellij and directly. in the uber jar. Thanks for responding.
> >
> > - Aeden
> >
> > On Wed, Jan 6, 2021 at 8:06 AM Piotr Nowojski  wrote:
> >>
> >> Hey,
> >>
> >> have you added Kafka connector as the dependency? [1]
> >>
> >> [1] 
> >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/kafka.html#dependencies
> >>
> >> Best,
> >> Piotrek
> >>
> >> śr., 6 sty 2021 o 04:37 Aeden Jameson  napisał(a):
> >>>
> >>> I've upgraded from 1.11.1 to 1.12 in hopes of using the key.fields
> >>> feature of the Kafa SQL Connector. My current connector is configured
> >>> as ,
> >>>
> >>> connector.type= 'kafka'
> >>> connector.version = 'universal'
> >>> connector.topic   = 'my-topic'
> >>> connector.properties.group.id = 'my-consumer-group'
> >>> connector.properties.bootstrap.servers = '...'
> >>> format.type = 'avro'
> >>> format.avro-schema = ''
> >>>
> >>> I tried adding
> >>>
> >>> key.fields = 'my_key_field'
> >>>
> >>> as well as
> >>>
> >>> key.format = 'avro'
> >>> key.fields = 'my_key_field'
> >>>
> >>> but I get the exception
> >>>
> >>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
> >>> Could not find a suitable table factory for
> >>> 'org.apache.flink.table.factories.TableSourceFactory' in
> >>> the classpath.
> >>>
> >>> Reason: No factory supports all properties.
> >>>
> >>> The matching candidates:
> >>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
> >>> Unsupported property keys:
> >>> key.fields
> >>> key.format
> >>>
> >>> The following factories have been considered:
> >>> org.apache.flink.table.sources.CsvBatchTableSourceFactory
> >>> org.apache.flink.table.sources.CsvAppendTableSourceFactory
> >>> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
> >>>  at 
> >>> org.apache.flink.table.factories.TableFactoryService.filterBySupportedProperties(TableFactoryService.java:434)
> >>>  at 
> >>> org.apache.flink.table.factories.

Re: Using key.fields in 1.12

2021-01-07 Thread Timo Walther

Hi Aeden,

`format.avro-schema` is not required anymore in the new design. The Avro 
schema is derived entirely from the table's schema.


Regards,
Timo



On 07.01.21 09:41, Aeden Jameson wrote:

Hi Timo,

Thanks for responding. You're right. So I did update the properties.

From what I can tell the new design you're referring to uses the

KafkaDynamicTableFactory, which contains the KEY_FIELDS (key.fields)
options, instead of KafkaTableSourceSinkFactoryBase, which doesn't
support those options. Is that right? So I updated my configuration to

connector= 'kafka'
topic   = 'my-topic'
properties.group.id = 'my-consumer-group'
properties.bootstrap.servers = '...'
format = 'avro'
format.avro-schema = ''
key.fields = 'my_key_field'

However, the property format.avro-schema doesn't appear to be
supported by KafkaDynamicTableFactory. I get this exception.

Caused by: org.apache.flink.table.api.ValidationException: Unsupported
options found for connector 'kafka'.

Unsupported options:

format.avro-schema

Supported options:

connector
format
key.fields
key.fields-prefix
key.format
properties.bootstrap.servers
properties.group.id
property-version
scan.startup.mode
scan.startup.specific-offsets
scan.startup.timestamp-millis
scan.topic-partition-discovery.interval
sink.parallelism
sink.partitioner
sink.semantic
topic
topic-pattern
value.fields-include
value.format
 at 
org.apache.flink.table.factories.FactoryUtil.validateUnconsumedKeys(FactoryUtil.java:324)
 at 
org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validate(FactoryUtil.java:554)
 at 
org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validateExcept(FactoryUtil.java:573)
 at 
org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory.createDynamicTableSource(KafkaDynamicTableFactory.java:141)
 at 
org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:122)
 ... 21 more

FAILURE: Build failed with an exception.




The format.avro-schema property was supported it what looks to me the
old design in in KafkaTableSourceSinkFactoryBase with this line,

 properties.add(FORMAT + ".*");

 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java#L160

Does format.avro-schema need to be specified differently?

Thank you,
Aeden

On Thu, Jan 7, 2021 at 12:15 AM Timo Walther  wrote:


Hi Aeden,

we updated the connector property design in 1.11 [1]. The old
translation layer exists for backwards compatibility and is indicated by
`connector.type=kafka`.

However, `connector = kafka` indicates the new property design and
`key.fields` is only available there. Please check all properties again
when upgrading, they are mentioned here [2].

Regards,
Timo


[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-122%3A+New+Connector+Property+Keys+for+New+Factory
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/


On 06.01.21 18:35, Aeden Jameson wrote:

Yes, I do have that dependency. I see it in the dependency view of
intellij and directly. in the uber jar. Thanks for responding.

- Aeden

On Wed, Jan 6, 2021 at 8:06 AM Piotr Nowojski  wrote:


Hey,

have you added Kafka connector as the dependency? [1]

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/kafka.html#dependencies

Best,
Piotrek

śr., 6 sty 2021 o 04:37 Aeden Jameson  napisał(a):


I've upgraded from 1.11.1 to 1.12 in hopes of using the key.fields
feature of the Kafa SQL Connector. My current connector is configured
as ,

connector.type= 'kafka'
connector.version = 'universal'
connector.topic   = 'my-topic'
connector.properties.group.id = 'my-consumer-group'
connector.properties.bootstrap.servers = '...'
format.type = 'avro'
format.avro-schema = ''

I tried adding

key.fields = 'my_key_field'

as well as

key.format = 'avro'
key.fields = 'my_key_field'

but I get the exception

Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
Could not find a suitable table factory for
'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Reason: No factory supports all properties.

The matching candidates:
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
Unsupported property keys:
key.fields
key.format

The following factories have been considered:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
  at 
org.apache.flink.table.factories.TableFactoryService.filterBySupportedProperties(TableFactoryService.java:434)
  at 
org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:195)
  at 
org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactorySe

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-07 Thread Aljoscha Krettek
This is somewhat unrelated to the discussion about how to actually do 
the triggering when sources shut down, I'll write on that separately. I 
just wanted to get this quick thought out.


For letting operators decide whether they actually want to wait for a 
final checkpoint, which is relevant at least for Async I/O and 
potentially for sinks.


We could introduce an interface, sth like `RequiresFinalization` or 
`FinalizationListener` (all bad names). The operator itself knows when 
it is ready to completely shut down, Async I/O would wait for all 
requests, sink would potentially wait for a given number of checkpoints.  
The interface would have a method like `isFinalized()` that the 
framework can call after each checkpoint (and potentially at other 
points)


This way we would decouple that logic from things that don't actually 
need it. What do you think?


Best,
Aljoscha


Flink app logs to Elastic Search

2021-01-07 Thread bat man
Hi Team,

I have a requirement to push the flink app logs to Elastic Search for log
management. Can anyone guide if you are already doing this.
I have tried this -
https://cristian.io/post/flink-log4j/
I’m not getting any error for a sample job I tried.

I am using EMR to run Flink 1.9 and Elastic Search latest version running
on ec2 machine.

Thanks,
Hemant


Re: Flink kafka exceptions handling

2021-01-07 Thread Piotr Nowojski
Hi Amira,

I think the previous topic you are referring to doesn't seem to be related
with your current problem.

Regarding your problem, I'm afraid I don't know the FlinkKafkaConsumer code
too well. Maybe someone else from the community could help?

Best,
Piotrek

śr., 6 sty 2021 o 19:01 BELGHITH Amira (EXT) 
napisał(a):

>
>
> Thank you for your answer.
>
> I have been subscribed.
>
>
>
> This is the previous topic I’m referring to
> http://mail-archives.apache.org/mod_mbox/flink-user/202008.mbox/%3CCACzKVZQ093HixMewb_prtP41ceXgmxCv=cmpsbphw-9+h8b...@mail.gmail.com%3E
>
>
>
> *Our flink job manager fails after multiple restarting, when the Kafka
> Consumer does not find a topic for example. We have a kafka exception
> TopicUnthaurizationException. We listen to a list a topics and whenever one
> is down , all our streaming system is down .. is there a way to handle
> those exceptions in the FlinkKafkaConsumer so the job manager does not
> fail?*
>
>
>
>
>
> *De :* Amira Belghith 
> *Envoyé :* mercredi 6 janvier 2021 18:36
> *À :* BELGHITH Amira (EXT) ResgGtsOpmOptVdf ;
> amira.belghith-...@soge.com
> *Objet :* Fwd: Flink kafka exceptions handling
>
>
>
> [EMETTEUR EXTERNE] / [EXTERNAL SENDER]
> Soyez vigilant avant d'ouvrir les pièces jointes ou de cliquer sur les
> liens. En cas de doute, signalez le message via le bouton "Message suspect"
> ou consultez go/secu.
> Be cautious before opening attachments or clicking on any links. If in
> doubt, use "Suspicious email" button or visit go/secu.
>
>
>
>
>
>
>
> -- Message transféré -
> De : *Piotr Nowojski* 
> Date : mer. 6 janv. 2021 à 17:26
> Objet : Re: Flink kafka exceptions handling
> À : Amira Belghith 
> CC : buggi...@gmail.com 
>
>
>
> I think you first need to be subscribed as it's explained here [1]. Could
> you also link to which previous topic are you referring to?
>
>
>
> Piotrek
>
>
>
> [1] https://flink.apache.org/community.html#mailing-lists
>
>
>
> śr., 6 sty 2021 o 17:09 Amira Belghith 
> napisał(a):
>
> Hey,
>
> Thanks for your fast reply.
>
> The mail couldnt be delivered to the mailing list.
>
>
>
> Le mer. 6 janv. 2021 à 16:59, Piotr Nowojski  a
> écrit :
>
> Hey,
>
>
>
> could you post the question on the user  mailing
> list?
>
>
>
> Thanks,
>
> Piotrek
>
>
>
> śr., 6 sty 2021 o 15:11 Amira Belghith 
> napisał(a):
>
> Hi Nick, Piotr,
>
>
>
> Im a software engineer working for Societe Generale bank.
>
> I saw your discussion about FlinkKafkaConsumer and exceptions handling.
>
> I have the same problem for a week now, and I wanted to know if you have
> found a solution.
>
> Our flink job manager fails after multiple restarting, when the Kafka
> Consumer does not find a topic for example. We have a kafka exception
> TopicUnthaurizationException. We listen to a list a topics and whenever one
> is down , all our streaming system is down .. is there a way to handle
> those exceptions in the FlinkKafkaConsumer so the job manager does not fail?
>
>
>
> Thanks a lot for your help,
>
> Amira belghith
>
>
>
> =
>
> Ce message et toutes les pieces jointes (ci-apres le "message")
> sont confidentiels et susceptibles de contenir des informations
> couvertes par le secret professionnel. Ce message est etabli
> a l'intention exclusive de ses destinataires. Toute utilisation
> ou diffusion non autorisee interdite.
> Tout message electronique est susceptible d'alteration. La SOCIETE GENERALE
> et ses filiales declinent toute responsabilite au titre de ce message
> s'il a ete altere, deforme falsifie.
>
> =
>
> This message and any attachments (the "message") are confidential,
> intended solely for the addresses, and may contain legally privileged
> information. Any unauthorized use or dissemination is prohibited.
> E-mails are susceptible to alteration. Neither SOCIETE GENERALE nor any
> of its subsidiaries or affiliates shall be liable for the message
> if altered, changed or falsified.
>
> =
>


Re: Re: Implementing a TarInputFormat based on FileInputFormat

2021-01-07 Thread Yun Gao
Hi Billy,

I checked the provided example and found it should be a problem of 
ContinuousFileReader, and I created an issue for it[1]. For temporarily go 
around the issue, I think you may disable the chain of 
ContinuousFileReaderOperator with the following operators:

   
android.disableChaining().sinkTo(sink);
Best,
 Yun

[1] https://issues.apache.org/jira/browse/FLINK-20888




 --Original Mail --
Sender:Billy Bain 
Send Date:Thu Jan 7 04:02:34 2021
Recipients:Arvid Heise 
CC:user , Billy Bain 
Subject:Re: Implementing a TarInputFormat based on FileInputFormat

Hi Arvid, 

Thanks for the response. I have created a sample application with input data 
and uploaded it to google drive. The sample data is in the archive... thus the 
large size. (27 mb)

https://drive.google.com/file/d/1dxpnDF3hPUPNlPO5p2tBf-88oOVV0qck/view?usp=sharing

To run it:
flink run  -Dexecution.runtime-mode=BATCH -c com.billybobbain.AndroidTarReader 
/path/to/flink-tar/build/libs/flink-tar-0.1.jar --input_path 
/path/to/flink-tar/playstore-00.tar.gz --output_path /path/to/output/

The main class:
public class AndroidTarReader {
public static void main(String[] args) throws Exception {
ParameterTool parameter = ParameterTool.fromArgs(args);
String inputPath = parameter.get("input_path");
String outputPath = parameter.get("output_path");
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource android = env.readFile(new 
TarInputFormat<>(new Path(inputPath), AndroidData.class), inputPath);
final FileSink sink = FileSink
.forRowFormat(new Path(outputPath), new AndroidDataEncoder())

.withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("filtered").withPartSuffix(".json").build())
.withRollingPolicy(DefaultRollingPolicy.builder()
.withMaxPartSize(1024 * 1024)
.build())
.build();
android.sinkTo(sink);
env.execute("zMarket Android");
}
}

On Tue, Jan 5, 2021 at 5:59 AM Arvid Heise  wrote:

Hi Billy,

the exception is happening on the output side. Input side looks fine. Could you 
maybe post more information about the sink?

On Mon, Dec 28, 2020 at 8:11 PM Billy Bain  wrote:

I am trying to implement a class that will work similar to AvroFileFormat. 

This tar archive has a very specific format. It has only one file inside and 
that file is line delimited JSON. 

I get this exception, but all the data is written to the temporary files. I 
have checked that my code isn't closing the stream, which was my prior issue. 

Caused by: java.nio.channels.ClosedChannelException
at java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150)
at java.base/sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:325)
at 
org.apache.flink.core.fs.local.LocalRecoverableFsDataOutputStream.getPos(LocalRecoverableFsDataOutputStream.java:101)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter.getSize(OutputStreamBasedPartFileWriter.java:70)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy.shouldRollOnCheckpoint(DefaultRollingPolicy.java:71)
at 
org.apache.flink.connector.file.sink.writer.FileWriterBucket.prepareCommit(FileWriterBucket.java:195)
at 
org.apache.flink.connector.file.sink.writer.FileWriter.prepareCommit(FileWriter.java:202)
at 
org.apache.flink.streaming.runtime.operators.sink.AbstractSinkWriterOperator.endInput(AbstractSinkWriterOperator.java:97)
at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:91)
at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$close$0(StreamOperatorWrapper.java:127)
at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper$$Lambda$343/00.run(Unknown
 Source)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:127)
at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:412)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:585)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:547)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
at java.base/java.lang.Thread.run(Thread.java:836)

public class TarInputFormat extends FileInputFormat implements 
ResultTypeQueryable {

private static final Logger logger = 
LoggerFactory.getLogger(TarInputFormat.class);
   

Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-07 Thread Arvid Heise
>
> We could introduce an interface, sth like `RequiresFinalization` or
> `FinalizationListener` (all bad names). The operator itself knows when
> it is ready to completely shut down, Async I/O would wait for all
> requests, sink would potentially wait for a given number of checkpoints.
> The interface would have a method like `isFinalized()` that the
> framework can call after each checkpoint (and potentially at other
> points)


I think we are mixing two different things here that may require different
solutions:
1. Tasks (=sink) that may need to do something with the final checkpoint.
2. Tasks that only finish after having finished operations that do not
depend on data flow (async I/O, but I could also think of some timer
actions in process functions).

Your proposal would help most for the first case. The second case can
solved entirely with current methods without being especially complicated:
- EOP is only emitted once Async I/O is done with all background tasks
- All timers are fired in a process function (I think we rather want to
fire immediately on EOP but that's a different discussion)
The advantage of this approach over your idea is that you don't need to
wait for a checkpoint to complete to check for finalization.

Now let's look at the first case. I see two alternatives:
- The new sink interface implicitly incorporates this listener. Since I
don't see a use case outside sinks, we could simply add this method to the
new sink interface.
- We implicitly assume that a sink is done after having a successful
checkpoint at the end. Then we just need a tag interface
`RequiresFinalization`. It also feels like we should add the property
`final` to checkpoint options to help the sink detect that this is the last
checkpoint to be taken. We could also try to always have the final
checkpoint without tag interface on new sinks...

On Thu, Jan 7, 2021 at 11:58 AM Aljoscha Krettek 
wrote:

> This is somewhat unrelated to the discussion about how to actually do
> the triggering when sources shut down, I'll write on that separately. I
> just wanted to get this quick thought out.
>
> For letting operators decide whether they actually want to wait for a
> final checkpoint, which is relevant at least for Async I/O and
> potentially for sinks.
>
> We could introduce an interface, sth like `RequiresFinalization` or
> `FinalizationListener` (all bad names). The operator itself knows when
> it is ready to completely shut down, Async I/O would wait for all
> requests, sink would potentially wait for a given number of checkpoints.
> The interface would have a method like `isFinalized()` that the
> framework can call after each checkpoint (and potentially at other
> points)
>
> This way we would decouple that logic from things that don't actually
> need it. What do you think?
>
> Best,
> Aljoscha
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Flink kafka exceptions handling

2021-01-07 Thread Aljoscha Krettek

Hi,

When you say that the `JobManager` goes down, you're referring to the 
fact that the Flink job will finish in a failed state after too many 
exceptions have occurred in the `FlinkKafkaConsumer. Is that correct?


I'm afraid right now there is no code path that would allow catching 
those `TopicUnthaurizationException`. We basically treat most exceptions 
coming from Kafka as errors that require recovery.


What behaviour would you have in mind as a reaction to those exceptions?

Best,
Aljoscha

On 2021/01/06 17:59, BELGHITH Amira (EXT) wrote:


Thank you for your answer.
I have been subscribed.

This is the previous topic I’m referring to 
http://mail-archives.apache.org/mod_mbox/flink-user/202008.mbox/%3CCACzKVZQ093HixMewb_prtP41ceXgmxCv=cmpsbphw-9+h8b...@mail.gmail.com%3E

Our flink job manager fails after multiple restarting, when the Kafka Consumer 
does not find a topic for example. We have a kafka exception 
TopicUnthaurizationException. We listen to a list a topics and whenever one is 
down , all our streaming system is down .. is there a way to handle those 
exceptions in the FlinkKafkaConsumer so the job manager does not fail?


De : Amira Belghith 
Envoyé : mercredi 6 janvier 2021 18:36
À : BELGHITH Amira (EXT) ResgGtsOpmOptVdf ; 
amira.belghith-...@soge.com
Objet : Fwd: Flink kafka exceptions handling

[EMETTEUR EXTERNE] / [EXTERNAL SENDER]
Soyez vigilant avant d'ouvrir les pièces jointes ou de cliquer sur les liens. En cas de 
doute, signalez le message via le bouton "Message suspect" ou consultez go/secu.
Be cautious before opening attachments or clicking on any links. If in doubt, use 
"Suspicious email" button or visit go/secu.



-- Message transféré -
De : Piotr Nowojski mailto:pnowoj...@apache.org>>
Date : mer. 6 janv. 2021 à 17:26
Objet : Re: Flink kafka exceptions handling
À : Amira Belghith mailto:belghith.am...@gmail.com>>
CC : buggi...@gmail.com 
mailto:buggi...@gmail.com>>

I think you first need to be subscribed as it's explained here [1]. Could you 
also link to which previous topic are you referring to?

Piotrek

[1] https://flink.apache.org/community.html#mailing-lists

śr., 6 sty 2021 o 17:09 Amira Belghith 
mailto:belghith.am...@gmail.com>> napisał(a):
Hey,
Thanks for your fast reply.
The mail couldnt be delivered to the mailing list.

Le mer. 6 janv. 2021 à 16:59, Piotr Nowojski 
mailto:pnowoj...@apache.org>> a écrit :
Hey,

could you post the question on the user 
mailto:user@flink.apache.org>> mailing list?

Thanks,
Piotrek

śr., 6 sty 2021 o 15:11 Amira Belghith 
mailto:belghith.am...@gmail.com>> napisał(a):
Hi Nick, Piotr,

Im a software engineer working for Societe Generale bank.
I saw your discussion about FlinkKafkaConsumer and exceptions handling.
I have the same problem for a week now, and I wanted to know if you have found 
a solution.
Our flink job manager fails after multiple restarting, when the Kafka Consumer 
does not find a topic for example. We have a kafka exception 
TopicUnthaurizationException. We listen to a list a topics and whenever one is 
down , all our streaming system is down .. is there a way to handle those 
exceptions in the FlinkKafkaConsumer so the job manager does not fail?

Thanks a lot for your help,
Amira belghith

=

Ce message et toutes les pieces jointes (ci-apres le "message")
sont confidentiels et susceptibles de contenir des informations
couvertes par le secret professionnel. Ce message est etabli
a l'intention exclusive de ses destinataires. Toute utilisation
ou diffusion non autorisee interdite.
Tout message electronique est susceptible d'alteration. La SOCIETE GENERALE
et ses filiales declinent toute responsabilite au titre de ce message
s'il a ete altere, deforme falsifie.

=

This message and any attachments (the "message") are confidential,
intended solely for the addresses, and may contain legally privileged
information. Any unauthorized use or dissemination is prohibited.
E-mails are susceptible to alteration. Neither SOCIETE GENERALE nor any
of its subsidiaries or affiliates shall be liable for the message
if altered, changed or falsified.

=


Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-07 Thread Khachatryan Roman
Thanks for starting this discussion (and sorry for probably duplicated
questions, I couldn't find them answered in FLIP or this thread).

1. Option 1 is said to be not preferable because it wastes resources and
adds complexity (new event).
However, the resources would be wasted for a relatively short time until
the job finishes completely.
And compared to other options, complexity seems much lower. Or are
differences in task completion times so huge and so common?

2. I think it would be helpful to describe how is rescaling handled in
Options 2 and 3 (or maybe it's not supported for jobs about to finish).

3. Option 3 assumes that the state of a finished task is not used. That's
true for operator state, but what about channel state (captured by
unaligned checkpoint)? I think it still has to be sent downstream which
invalidates this Option.

Regards,
Roman


On Thu, Jan 7, 2021 at 1:21 PM Arvid Heise  wrote:

> We could introduce an interface, sth like `RequiresFinalization` or
>> `FinalizationListener` (all bad names). The operator itself knows when
>> it is ready to completely shut down, Async I/O would wait for all
>> requests, sink would potentially wait for a given number of checkpoints.
>> The interface would have a method like `isFinalized()` that the
>> framework can call after each checkpoint (and potentially at other
>> points)
>
>
> I think we are mixing two different things here that may require different
> solutions:
> 1. Tasks (=sink) that may need to do something with the final checkpoint.
> 2. Tasks that only finish after having finished operations that do not
> depend on data flow (async I/O, but I could also think of some timer
> actions in process functions).
>
> Your proposal would help most for the first case. The second case can
> solved entirely with current methods without being especially complicated:
> - EOP is only emitted once Async I/O is done with all background tasks
> - All timers are fired in a process function (I think we rather want to
> fire immediately on EOP but that's a different discussion)
> The advantage of this approach over your idea is that you don't need to
> wait for a checkpoint to complete to check for finalization.
>
> Now let's look at the first case. I see two alternatives:
> - The new sink interface implicitly incorporates this listener. Since I
> don't see a use case outside sinks, we could simply add this method to the
> new sink interface.
> - We implicitly assume that a sink is done after having a successful
> checkpoint at the end. Then we just need a tag interface
> `RequiresFinalization`. It also feels like we should add the property
> `final` to checkpoint options to help the sink detect that this is the last
> checkpoint to be taken. We could also try to always have the final
> checkpoint without tag interface on new sinks...
>
> On Thu, Jan 7, 2021 at 11:58 AM Aljoscha Krettek 
> wrote:
>
>> This is somewhat unrelated to the discussion about how to actually do
>> the triggering when sources shut down, I'll write on that separately. I
>> just wanted to get this quick thought out.
>>
>> For letting operators decide whether they actually want to wait for a
>> final checkpoint, which is relevant at least for Async I/O and
>> potentially for sinks.
>>
>> We could introduce an interface, sth like `RequiresFinalization` or
>> `FinalizationListener` (all bad names). The operator itself knows when
>> it is ready to completely shut down, Async I/O would wait for all
>> requests, sink would potentially wait for a given number of checkpoints.
>> The interface would have a method like `isFinalized()` that the
>> framework can call after each checkpoint (and potentially at other
>> points)
>>
>> This way we would decouple that logic from things that don't actually
>> need it. What do you think?
>>
>> Best,
>> Aljoscha
>>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


RE: Flink kafka exceptions handling

2021-01-07 Thread BELGHITH Amira (EXT)
Thanks for your feedbacks.

Please find below my answers:



-Message d'origine-

De : Aljoscha Krettek 

Envoyé : jeudi 7 janvier 2021 13:55

À : user@flink.apache.org

Objet : Re: Flink kafka exceptions handling



[EMETTEUR EXTERNE] / [EXTERNAL SENDER]

Soyez vigilant avant d'ouvrir les pièces jointes ou de cliquer sur les liens. 
En cas de doute, signalez le message via le bouton "Message suspect" ou 
consultez go/secu.

Be cautious before opening attachments or clicking on any links. If in doubt, 
use "Suspicious email" button or visit go/secu.





Hi,



When you say that the `JobManager` goes down, you're referring to the fact that 
the Flink job will finish in a failed state after too many exceptions have 
occurred in the `FlinkKafkaConsumer. Is that correct?



--> Yes, this is exactly what I meant, the Flink job is in a failed state



I'm afraid right now there is no code path that would allow catching those 
`TopicUnthaurizationException`. We basically treat most exceptions coming from 
Kafka as errors that require recovery.



--> We can have other exceptions, like like TimeoutException when our Kafka 
brokers are degraded.



What behaviour would you have in mind as a reaction to those exceptions?



--> Our processing System is supposed to continue streaming data even though 
there is some Kafka errors, we are expecting that the KafkaConsumer fails but 
not the Flink job, do you think it is possible?



Best,

Aljoscha



On 2021/01/06 17:59, BELGHITH Amira (EXT) wrote:

>

>Thank you for your answer.

>I have been subscribed.

>

>This is the previous topic I’m referring to

>http://mail-archives.apache.org/mod_mbox/flink-user/202008.mbox/%3CCACz

>KVZQ093HixMewb_prtP41ceXgmxCv=cmpsbphw-9+h8b...@mail.gmail.com%3E

>

>Our flink job manager fails after multiple restarting, when the Kafka Consumer 
>does not find a topic for example. We have a kafka exception 
>TopicUnthaurizationException. We listen to a list a topics and whenever one is 
>down , all our streaming system is down .. is there a way to handle those 
>exceptions in the FlinkKafkaConsumer so the job manager does not fail?

>

>

>De : Amira Belghith  Envoyé : mercredi 6

>janvier 2021 18:36 À : BELGHITH Amira (EXT) ResgGtsOpmOptVdf

>; amira.belghith-...@soge.com Objet :

>Fwd: Flink kafka exceptions handling

>

>[EMETTEUR EXTERNE] / [EXTERNAL SENDER]

>Soyez vigilant avant d'ouvrir les pièces jointes ou de cliquer sur les liens. 
>En cas de doute, signalez le message via le bouton "Message suspect" ou 
>consultez go/secu.

>Be cautious before opening attachments or clicking on any links. If in doubt, 
>use "Suspicious email" button or visit go/secu.

>

>

>

>-- Message transféré -

>De : Piotr Nowojski mailto:pnowoj...@apache.org>>

>Date : mer. 6 janv. 2021 à 17:26

>Objet : Re: Flink kafka exceptions handling À : Amira Belghith

>mailto:belghith.am...@gmail.com>>

>CC : buggi...@gmail.com

>mailto:buggi...@gmail.com>>

>

>I think you first need to be subscribed as it's explained here [1]. Could you 
>also link to which previous topic are you referring to?

>

>Piotrek

>

>[1] https://flink.apache.org/community.html#mailing-lists

>

>śr., 6 sty 2021 o 17:09 Amira Belghith 
>mailto:belghith.am...@gmail.com>> napisał(a):

>Hey,

>Thanks for your fast reply.

>The mail couldnt be delivered to the mailing list.

>

>Le mer. 6 janv. 2021 à 16:59, Piotr Nowojski 
>mailto:pnowoj...@apache.org>> a écrit :

>Hey,

>

>could you post the question on the user 
>mailto:user@flink.apache.org>> mailing list?

>

>Thanks,

>Piotrek

>

>śr., 6 sty 2021 o 15:11 Amira Belghith 
>mailto:belghith.am...@gmail.com>> napisał(a):

>Hi Nick, Piotr,

>

>Im a software engineer working for Societe Generale bank.

>I saw your discussion about FlinkKafkaConsumer and exceptions handling.

>I have the same problem for a week now, and I wanted to know if you have found 
>a solution.

>Our flink job manager fails after multiple restarting, when the Kafka Consumer 
>does not find a topic for example. We have a kafka exception 
>TopicUnthaurizationException. We listen to a list a topics and whenever one is 
>down , all our streaming system is down .. is there a way to handle those 
>exceptions in the FlinkKafkaConsumer so the job manager does not fail?

>

>Thanks a lot for your help,

>Amira belghith

>

>=

>

>Ce message et toutes les pieces jointes (ci-apres le "message") sont

>confidentiels et susceptibles de contenir des informations couvertes

>par le secret professionnel. Ce message est etabli a l'intention

>exclusive de ses destinataires. Toute utilisation ou diffusion non

>autorisee interdite.

>Tout message electronique est susceptible d'alteration. La SOCIETE

>GENERALE et ses filiales declinent toute responsabilite au titre de ce

>message s'il a ete altere, deforme falsifie.

>

>=

Re: Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-07 Thread Yun Gao
Hi Roman,

   Very thanks for the feedbacks! I'll try to answer the issues inline:

> 1. Option 1 is said to be not preferable because it wastes resources and adds 
> complexity (new event). 
> However, the resources would be wasted for a relatively short time until the 
> job finishes completely. 
> And compared to other options, complexity seems much lower. Or are 
> differences in task completion times so huge and so common?

There might be mixed jobs with both bounded sources and unbounded sources, in 
this case, the resource for the finished 
part of the job would not be able to be released.

And the Option 1 also complicates the semantics of the EndOfPartition, since if 
we holding the tasks and we still need to
notify the following tasks about all records are sent, we would have to 
introduce some kind of pre-EndOfPartition messages, 
which is similar to the current EndOfPartition, but do not cause the channels 
to be released.

> 2. I think it would be helpful to describe how is rescaling handled in 
> Options 2 and 3 (or maybe it's not supported for jobs about to finish).

For Option 2 and 3 we managed the states via the unit of operator, thus the 
process of rescaling would be the same with the normal checkpoint.
For example, support one operator resides in a tasks with parallelism 4, if 2 
fo the subtasks are finished, now the state of the operator is composed 
of the state of the 2 remaining subtask instance, if we rescale to 5 after 
failover, the state of the 2 previous remaining subtasks would be 
re-distributed 
to the 5 new subtasks after failover. 

If before failover all the 4 subtasks are finished, the operator would be 
marked as finished, after failover the operator would be still marked as 
finished, 
and all the subtask instance of this operator would skip all the methods like 
open(), endOfInput(), close() and would be excluded when taking checkpoints
 after failover.


> 3. Option 3 assumes that the state of a finished task is not used. That's 
> true for operator state, but what about channel state (captured by unaligned 
> checkpoint)?
> I think it still has to be sent downstream which invalidates this Option.

For unaligned checkpoint, if in one checkpoint a subtask is marked as finished, 
then its descandent tasks would wait all the records are received
 from the finished tasks before taking checkpoint, thus in this case we would 
not have result partition state, but only have channel state for the 
downstream tasks that are still running.

In detail, support we have a job with the graph A -> B -> C, support in one 
checkpoint A has reported FINISHED, CheckpointCoordinator would 
choose B as the new "source" to trigger checkpoint via RPC. For task B, if it 
received checkpoint trigger, it would know that all its precedant tasks
are finished, then it would wait till all the InputChannel received 
EndOfPartition from the network (namely inputChannel.onBuffer() is called with 
EndOfPartition) and then taking snapshot for the input channels, as the normal 
unaligned checkpoints does for the InputChannel side. Then
we would be able to ensure the finished tasks always have an empty state.

I'll also optimize the FLIP to make it more clear~

Best,
 Yun



 --Original Mail --
Sender:Khachatryan Roman 
Send Date:Thu Jan 7 21:55:52 2021
Recipients:Arvid Heise 
CC:dev , user 
Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Thanks for starting this discussion (and sorry for probably duplicated 
questions, I couldn't find them answered in FLIP or this thread).

1. Option 1 is said to be not preferable because it wastes resources and adds 
complexity (new event). 
However, the resources would be wasted for a relatively short time until the 
job finishes completely. 
And compared to other options, complexity seems much lower. Or are differences 
in task completion times so huge and so common?

2. I think it would be helpful to describe how is rescaling handled in Options 
2 and 3 (or maybe it's not supported for jobs about to finish).

3. Option 3 assumes that the state of a finished task is not used. That's true 
for operator state, but what about channel state (captured by unaligned 
checkpoint)? I think it still has to be sent downstream which invalidates this 
Option.

Regards,
Roman

On Thu, Jan 7, 2021 at 1:21 PM Arvid Heise  wrote:

We could introduce an interface, sth like `RequiresFinalization` or 
`FinalizationListener` (all bad names). The operator itself knows when 
it is ready to completely shut down, Async I/O would wait for all 
requests, sink would potentially wait for a given number of checkpoints.  
The interface would have a method like `isFinalized()` that the 
framework can call after each checkpoint (and potentially at other 
points)

I think we are mixing two different things here that may require different 
solutions:
1. Tasks (=sink) that may need to do something with the final checkpoint.
2. Tasks that only f

Re: Flink UDF registration from jar at runtime

2021-01-07 Thread Jakub N
Hi Dawid,

The approach you sent indeed solved our problem.

You helped me and my colleague tremendously, great thanks.

Kind regards,

Jakub



From: Dawid Wysakowicz
Sent: Tuesday, January 5, 2021 16:57
To: Jakub N
Cc: user@flink.apache.org
Subject: Re: Flink UDF registration from jar at runtime


Hi Jakub,


Sorry for a late reply. I've just came back from the Christmas holidays.


Unfortunately you're right and it's not as easy as I originally thought. 
Apologies for that. It works when you use a constant because of a constant 
expressions reduction. So the function is executed before actually submitting 
the job to the cluster.


It does not work if the function needs to be invoked on the cluster because 
Tasks use a classloader build from a jar files shipped through the BlobManager. 
Tasks cannot simply use a thread classloader, because it does not make much 
sense in case of a distributed setup. What you could try to do with rather a 
hack is to add the files with generated classes to the cluster. You could try 
this approach:


URLClassLoader functionClassloader = ...

try (TemporaryClassLoaderContext ignored = 
TemporaryClassLoaderContext.of(functionClassloader)) {
EnvironmentSettings fsSettings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();

Configuration effectiveConfiguration = new Configuration();
effectiveConfiguration.set(DeploymentOptions.TARGET, "local");
effectiveConfiguration.set(DeploymentOptions.ATTACHED, true);
ConfigUtils.encodeCollectionToConfig(
effectiveConfiguration,
PipelineOptions.CLASSPATHS,
Arrays.asList(functionClassloader.getURLs().clone()),
URL::toString);
StreamExecutionEnvironment fsEnv = new 
StreamExecutionEnvironment(effectiveConfiguration);

StreamTableEnvironment fsTableEnv = 
StreamTableEnvironment.create(fsEnv, fsSettings);

fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
fsEnv.getConfig().enableObjectReuse();

List data = new ArrayList<>();
data.add("cake");
Table table = fsTableEnv.fromDataStream(fsEnv.fromCollection(data));
table.printSchema();
fsTableEnv.registerTable("recipes", table);
fsTableEnv.createFunction("StringFunc", stringFunc);
fsTableEnv.executeSql("SELECT StringFunc(f0), f0 FROM 
recipes").print();

//Surprisingly the following line can find the StringFunc
//fsTableEnv.executeSql("SELECT StringFunc('bread'), f0 FROM 
recipes").print();
}


This manually creates a LocalEnvironment with the URLs added to the cluster 
classpath. If you use a RemoteEnvironment you could use the ctor that accepts a 
globalClasspath. Bear in mind it uses lower level APIs, that have no stability 
guarantees.


Best,

Dawid


On 13/12/2020 21:22, Jakub N wrote:
Hi Dawid,

thanks a lot for your help. Unfortunately our issue still persists but with 
your example we managed to reconstruct our problem in the following code:

File folder = new File("custom");
URLClassLoader functionClassloader = ClassLoaderUtils.compileAndLoadJava(
folder,
"StringFunc.java",
""
+ "import org.apache.flink.table.functions.ScalarFunction;"
+ "\n"
+ "public class StringFunc extends ScalarFunction {\n"
+ "\tpublic String eval(String b) {\n"
+ "\t\treturn b + \" : udf invoked\";\n"
+ "\t}\n"
+ "}"
);

EnvironmentSettings fsSettings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();

StreamExecutionEnvironment fsEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, 
fsSettings);

fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
fsEnv.getConfig().enableObjectReuse();

Class stringFunc = (Class) 
functionClassloader.loadClass("StringFunc");

List data = new ArrayList<>();
data.add("cake");
Table table = fsTableEnv.fromDataStream(fsEnv.fromCollection(data));
table.printSchema();
fsTableEnv.registerTable("recipes", table);

try (TemporaryClassLoaderContext ignored = 
TemporaryClassLoaderContext.of(functionClassloader)) {
fsTableEnv.createFunction("StringFunc", stringFunc);
fsTableEnv.executeSql("SELECT StringFunc(f0), f0 FROM recipes").print();

//Surprisingly the following line can find the StringFunc
//fsTableEnv.executeSql("SELECT StringFunc('bread'), f0 FROM 
recipes").print();
}

This leads to a `java.lang.ClassNotFoundException: StringFunc`. As mentioned in 
the code, the commented line surprisingly works as intended.
Do you have any ideas on why this the case?

Kind regards,

Jakub


__

SQL Function with more than one parameter IN

2021-01-07 Thread danp
Hi everyone, 

I just posted to the Apache BEAM mailing list to find out if there is good 
syntax for my usecase. I've been following Flink for he last couple of yours 
so that why I'm emailing more here the same email to you aswell.

I've been reading thru both the Beam and the Flink website about UDF and
UADF
but I cant find an answer to my question, which is if one can combine more
than two parameters into a U-/DF? Im working in finance where for example
VWAP on timeseries data is an important metrics
and for that metric you need both the price and the volume to get an
accurate metric.

I've looked into
https://github.com/GoogleCloudPlatform/dataflow-sample-applications/tree/master/timeseries-streaming
which is a nice library for this kind of logic-/metrics, but I still think
that SQL could be a better fit for 
this kind of financial calculations.

Therefore I wonder if it is possible to create a U-A/DF that takes for
example volume and price as two parameters 
and via a window function create an accumulator that gives back the result?
Can one use a PAIR
in an SQL statement to use a TableAggregate with > 1 parameters in Flink
SQL?

Thanks in advance, 

Regards
Dan 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Flink Kafka integration issues

2021-01-07 Thread narasimha
Hi,

Context:
Built a fraud detection kind of app.
Business logic is all fine, but when putting into production, Kafka cluster
is becoming unstable.

The topic to which it wrote have approx 80 events/sec. post running for few
hours Kafka broker indexes are getting corrupted.

Topic config: single partition, replication 3.

Which is making consumer on this broker to fail.

Haven’t find any issues with load testing on non prod environments.

Not sure what could cause an issue.

Kafka version - 2.3
Flink version - 1.11.2- scala-2.12
Using same version Kafka consumer, Kafka producer.

Any suggestions are welcomed, what can be the cause. What are certain are
have to look at.

-- 
Sent from my iPhone


Re: Using key.fields in 1.12

2021-01-07 Thread Aeden Jameson
 Brilliant, thank you. That will come in handy. I was looking through docs
hoping there was a way to still specify the schema with no luck. Does such
an option exist?

On Thu, Jan 7, 2021 at 2:33 AM Timo Walther  wrote:

> Hi Aeden,
>
> `format.avro-schema` is not required anymore in the new design. The Avro
> schema is derived entirely from the table's schema.
>
> Regards,
> Timo
>
>
>
> On 07.01.21 09:41, Aeden Jameson wrote:
> > Hi Timo,
> >
> > Thanks for responding. You're right. So I did update the properties.
> >>From what I can tell the new design you're referring to uses the
> > KafkaDynamicTableFactory, which contains the KEY_FIELDS (key.fields)
> > options, instead of KafkaTableSourceSinkFactoryBase, which doesn't
> > support those options. Is that right? So I updated my configuration to
> >
> > connector= 'kafka'
> > topic   = 'my-topic'
> > properties.group.id = 'my-consumer-group'
> > properties.bootstrap.servers = '...'
> > format = 'avro'
> > format.avro-schema = ''
> > key.fields = 'my_key_field'
> >
> > However, the property format.avro-schema doesn't appear to be
> > supported by KafkaDynamicTableFactory. I get this exception.
> >
> > Caused by: org.apache.flink.table.api.ValidationException: Unsupported
> > options found for connector 'kafka'.
> >
> > Unsupported options:
> >
> > format.avro-schema
> >
> > Supported options:
> >
> > connector
> > format
> > key.fields
> > key.fields-prefix
> > key.format
> > properties.bootstrap.servers
> > properties.group.id
> > property-version
> > scan.startup.mode
> > scan.startup.specific-offsets
> > scan.startup.timestamp-millis
> > scan.topic-partition-discovery.interval
> > sink.parallelism
> > sink.partitioner
> > sink.semantic
> > topic
> > topic-pattern
> > value.fields-include
> > value.format
> >  at
> org.apache.flink.table.factories.FactoryUtil.validateUnconsumedKeys(FactoryUtil.java:324)
> >  at
> org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validate(FactoryUtil.java:554)
> >  at
> org.apache.flink.table.factories.FactoryUtil$TableFactoryHelper.validateExcept(FactoryUtil.java:573)
> >  at
> org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory.createDynamicTableSource(KafkaDynamicTableFactory.java:141)
> >  at
> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:122)
> >  ... 21 more
> >
> > FAILURE: Build failed with an exception.
> >
> >
> >
> >
> > The format.avro-schema property was supported it what looks to me the
> > old design in in KafkaTableSourceSinkFactoryBase with this line,
> >
> >  properties.add(FORMAT + ".*");
> >
> >
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java#L160
> >
> > Does format.avro-schema need to be specified differently?
> >
> > Thank you,
> > Aeden
> >
> > On Thu, Jan 7, 2021 at 12:15 AM Timo Walther  wrote:
> >>
> >> Hi Aeden,
> >>
> >> we updated the connector property design in 1.11 [1]. The old
> >> translation layer exists for backwards compatibility and is indicated by
> >> `connector.type=kafka`.
> >>
> >> However, `connector = kafka` indicates the new property design and
> >> `key.fields` is only available there. Please check all properties again
> >> when upgrading, they are mentioned here [2].
> >>
> >> Regards,
> >> Timo
> >>
> >>
> >> [1]
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-122%3A+New+Connector+Property+Keys+for+New+Factory
> >> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/
> >>
> >>
> >> On 06.01.21 18:35, Aeden Jameson wrote:
> >>> Yes, I do have that dependency. I see it in the dependency view of
> >>> intellij and directly. in the uber jar. Thanks for responding.
> >>>
> >>> - Aeden
> >>>
> >>> On Wed, Jan 6, 2021 at 8:06 AM Piotr Nowojski 
> wrote:
> 
>  Hey,
> 
>  have you added Kafka connector as the dependency? [1]
> 
>  [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/kafka.html#dependencies
> 
>  Best,
>  Piotrek
> 
>  śr., 6 sty 2021 o 04:37 Aeden Jameson 
> napisał(a):
> >
> > I've upgraded from 1.11.1 to 1.12 in hopes of using the key.fields
> > feature of the Kafa SQL Connector. My current connector is configured
> > as ,
> >
> > connector.type= 'kafka'
> > connector.version = 'universal'
> > connector.topic   = 'my-topic'
> > connector.properties.group.id = 'my-consumer-group'
> > connector.properties.bootstrap.servers = '...'
> > format.type = 'avro'
> > format.avro-schema = ''
> >
> > I tried adding
> >
> > key.fields = 'my_key_field'
> >
> > as well as
> >
> > key.format = 'avro'
> > key.fields = 'my_key_field'
> >
> > but I get the exception
> >
> > Caused by:
> org.apache.flink.table.api.NoM

Native Kubernetes deployment with GitOps

2021-01-07 Thread Alex Adriaanse
I'm trying to setup a Flink 1.12 deployment on a Kubernetes cluster using 
custom Docker images (since the official ones aren't out yet). Since the 
documentation states that "We generally recommend new users to deploy Flink on 
Kubernetes using native Kubernetes deployments", I'm trying out the native 
Kubernetes integration.

At the same time, I'm also trying to move to GitOps where we use a tool like 
Argo CD to keep Kubernetes resources in sync with declarations stored in a Git 
repository. From what I can tell there's no way to do GitOps with native 
Kubernetes deployments, since the Kubernetes resources it produces are 
generated dynamically and are managed via CLI rather than being suitable for 
storage in a Git repository. Is that correct?

Would it be better in my case to just use the standalone Kubernetes deployment 
option and have our CD tool deploy the YAML files listed in Flink's 
documentation via Git? Or should I use the native Kubernetes integration to 
create an initial set of resources in our cluster, export those to a Git 
repository, and then use a GitOps workflow to modify those resources going 
forward rather than using the Flink CLI? In other words, I'd only use the Flink 
CLI to bootstrap the Git repository.

Besides ease of setup, are there any other benefits to the native Kubernetes 
integration compared to standalone deployments?

Thanks!

Alex

Re: Native Kubernetes deployment with GitOps

2021-01-07 Thread Israel Ekpo
If you are just getting started, running it on Kubernetes could simplify
that logistics and resources needed for getting started.

It also allows you to possibly reuse infrastructure that you may already be
using for other projects and purposes.

If you are just getting started and just learning, the setup and teardown
time is much faster with Kubernetes (in my experience) when compared to
setting it as Standalone on VMs

Network connectivity to other components within my ecosystem is also easier
when with K8s.

Other options are available here for your reference

https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/#overview-and-reference-architecture

If you are just starting out, I would recommend deploying your containers
on the K8S cluster via YAML manifests first and then convert to Helm charts
(once the vanilla YAML is stable) so that you can bundle the various
components as a single deployment.



On Thu, Jan 7, 2021 at 4:57 PM Alex Adriaanse  wrote:

> I'm trying to setup a Flink 1.12 deployment on a Kubernetes cluster using
> custom Docker images (since the official ones aren't out yet). Since the
> documentation states that "We generally recommend new users to deploy Flink
> on Kubernetes using native Kubernetes deployments", I'm trying out the
> native Kubernetes integration.
>
> At the same time, I'm also trying to move to GitOps where we use a tool
> like Argo CD to keep Kubernetes resources in sync with declarations stored
> in a Git repository. From what I can tell there's no way to do GitOps with
> native Kubernetes deployments, since the Kubernetes resources it produces
> are generated dynamically and are managed via CLI rather than being
> suitable for storage in a Git repository. Is that correct?
>
> Would it be better in my case to just use the standalone Kubernetes
> deployment option and have our CD tool deploy the YAML files listed in
> Flink's documentation via Git? Or should I use the native Kubernetes
> integration to create an initial set of resources in our cluster, export
> those to a Git repository, and then use a GitOps workflow to modify those
> resources going forward rather than using the Flink CLI? In other words,
> I'd only use the Flink CLI to bootstrap the Git repository.
>
> Besides ease of setup, are there any other benefits to the native
> Kubernetes integration compared to standalone deployments?
>
> Thanks!
>
> Alex


Re: Native Kubernetes deployment with GitOps

2021-01-07 Thread Alex Adriaanse
Thanks for the reply!

Just to clarify, when I talked about standalone deployments I was referring to 
standalone Kubernetes deployments. We currently have no interest in running 
Flink outside of K8s. I was mostly just curious about the differences in the 
native integration vs. standalone deployment options on Kubernetes.

As a side note, we're planning on using Flink in Application Mode.

Alex

On Jan 7, 2021, at 4:17 PM, Israel Ekpo 
mailto:israele...@gmail.com>> wrote:

If you are just getting started, running it on Kubernetes could simplify that 
logistics and resources needed for getting started.

It also allows you to possibly reuse infrastructure that you may already be 
using for other projects and purposes.

If you are just getting started and just learning, the setup and teardown time 
is much faster with Kubernetes (in my experience) when compared to setting it 
as Standalone on VMs

Network connectivity to other components within my ecosystem is also easier 
when with K8s.

Other options are available here for your reference

https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/#overview-and-reference-architecture

If you are just starting out, I would recommend deploying your containers on 
the K8S cluster via YAML manifests first and then convert to Helm charts (once 
the vanilla YAML is stable) so that you can bundle the various components as a 
single deployment.



On Thu, Jan 7, 2021 at 4:57 PM Alex Adriaanse 
mailto:a...@oseberg.io>> wrote:
I'm trying to setup a Flink 1.12 deployment on a Kubernetes cluster using 
custom Docker images (since the official ones aren't out yet). Since the 
documentation states that "We generally recommend new users to deploy Flink on 
Kubernetes using native Kubernetes deployments", I'm trying out the native 
Kubernetes integration.

At the same time, I'm also trying to move to GitOps where we use a tool like 
Argo CD to keep Kubernetes resources in sync with declarations stored in a Git 
repository. From what I can tell there's no way to do GitOps with native 
Kubernetes deployments, since the Kubernetes resources it produces are 
generated dynamically and are managed via CLI rather than being suitable for 
storage in a Git repository. Is that correct?

Would it be better in my case to just use the standalone Kubernetes deployment 
option and have our CD tool deploy the YAML files listed in Flink's 
documentation via Git? Or should I use the native Kubernetes integration to 
create an initial set of resources in our cluster, export those to a Git 
repository, and then use a GitOps workflow to modify those resources going 
forward rather than using the Flink CLI? In other words, I'd only use the Flink 
CLI to bootstrap the Git repository.

Besides ease of setup, are there any other benefits to the native Kubernetes 
integration compared to standalone deployments?

Thanks!

Alex



Flink taskmanager id

2021-01-07 Thread Deshpande, Omkar
Hello,

I use flink on kubernetes. And the taskmanagers get assigned random uuids. Is 
there a way to explicitly configure them to use hostnames instead?


Omkar


How should I process a cumulative counter?

2021-01-07 Thread Larry Aspen
Hi,

I'm evaluating Flink for our company's IoT use case and read a blog post
by Fabian Hueske from 2015 [1]. We have a similar situation except the
sensor is sending the value of a cumulative counter instead of a count.
We would like to calculate the sum of deltas of consecutive cumulative
counter values that occur during a time window.

Here is a scenario of a cumulative counter measuring runtime in seconds
and a machine starting for the first time at 12:00:00 and running for
the whole hour (sensor records values when it starts, every 15 minutes
and on hour change):

timestamp, cumulative counter value in seconds
12:00:00, 0
12:15:00, 900
12:30:00, 1800
12:45:00, 2700
13:00:00, 3600

This would produce the following deltas:
12:00:00, 900 -0 = 900
12:15:00, 1800 - 900 = 900
12:30:00, 2700 - 1800 = 900
12:45:00, 3600 - 2700 = 900

We would then sum the deltas to get runtime in seconds for the hour:
900 + 900 + 900 + 900 = 3600

What would be a good way to handle this kind of calculation in Flink?

I have already tried using a tumbling event time window of one hour,
but then the last value is only part of the next window and the delta
of 12:45:00 is missing and the sum is 900 + 900 + 900 = 2700.

I have also tried a sliding event time window of two hours where the sum
is calculated for the first hour. This produces the correct sum in this
scenario but fails if the next hour is later (e.g. 12:45:00, 14:00:00
i.e. machine is shutdown between 12:45:00 - 13:00:00 and started at
14:00:00).

My latest attempt has been to use a global window where I try to keep
the values for the last two hours and calculate the sum for the older
hour. This seems to work in my experiments where I read values from
a file and use parallelism of one. If I increase the parallelism, the
values are processed out of order and the results are incorrect as
older values are received after newer values which causes them to be
evicted.

Any advice on this would be appreciated.

Best regards,
Larry Aspen

[1] https://flink.apache.org/news/2015/12/04/Introducing-windows.html


Flink to get historical data from kafka between timespan t1 & t2

2021-01-07 Thread VINAY.RAICHUR
Hi Flink Community Team,

This is a desperate request for your help on below.

I am new to the Flink and trying to use it with Kafka for Event-based data 
stream processing in my project. I am struggling using Flink to find solutions 
to my requirements of project below:


  1.  Get all Kafka topic records at a given time point 't' (now or in the 
past). Also how to pull latest-record only* from Kafka using Flink
  2.  Getting all records from Kafka for a given time interval in the past 
between t1 & t2 time period.
  3.  Continuously getting data from Kafka starting at a given time point (now 
or in the past). The client will actively cancel/close the data streaming. 
Examples: live dashboards. How to do it using Flink?
Please provide me sample "Flink code snippet" for pulling data from kafka for 
above three requirements and oblige. I am stuck for last one month without much 
progress and your timely help will be a savior for me!
Thanks & Regards,
Vinay Raichur
T-Systems India | Digital Solutions
Mail: vinay.raic...@t-systems.com
Mobile: +91 9739488992