Re: RichMapFunction to convert tuple of strings to DataStream[(String,String)]

2022-01-10 Thread Piotr Nowojski
Glad to hear it.

Best,
Piotrek

pon., 10 sty 2022 o 20:08 Siddhesh Kalgaonkar 
napisał(a):

> Hi Piotr,
>
> Thanks for the reply. I was looking for how to create a DataStream under a
> process function since using that I had to call something else but I came
> across one of Fabian's posts where he mentioned that this way of creating
> DS is not "encouraged and tested". So, I figured out an alternate way of
> using side output and now I can do what I was aiming for.
>
> Thanks,
> Sid.
>
> On Mon, Jan 10, 2022 at 5:29 PM Piotr Nowojski 
> wrote:
>
>> Hi Sid,
>>
>> I don't see on the stackoverflow explanation of what are you trying to do
>> here (no mentions of MapFunction or a tuple).
>>
>> If you want to create a `DataStream` from some a pre
>> existing/static Tuple of Strings, the easiest thing would be to convert the
>> tuple to a collection/iterator and use
>> `StreamExecutionEnvironment#fromCollection(...)`.
>> If you already have a `DataStream>` (for example your
>> source produces a tuple) and you want to flatten it to
>> `DataStream`, then you need a simple
>> `FlatMapFunction, String>` (or
>> `RichFlatMapFunction, String>`), that would do the flattening
>> via:
>>
>> public void flatMap(Tuple value, Collector out) throws
>> Exception {
>>   out.collect(value.f0);
>>   out.collect(value.f1);
>>   ...;
>>   out.collect(value.fN);
>> }
>>
>> Best,
>> Piotrek
>>
>> pt., 7 sty 2022 o 07:05 Siddhesh Kalgaonkar 
>> napisał(a):
>>
>>> Hi Francis,
>>>
>>> What I am trying to do is you can see over here
>>> https://stackoverflow.com/questions/70592174/richsinkfunction-for-cassandra-in-flink/70593375?noredirect=1#comment124796734_70593375
>>>
>>>
>>> On Fri, Jan 7, 2022 at 5:07 AM Francis Conroy <
>>> francis.con...@switchdin.com> wrote:
>>>
 Hi Siddhesh,

 How are you getting this tuple of strings into the system? I think this
 is the important question, you can create a DataStream in many ways, from a
 collection, from a source, etc but all of these rely on the
 ExecutionEnvironment you're using.
 A RichMapFunction doesn't produce a datastream directly, it's used in
 the context of the StreamExecutionEnvironment to create a stream i.e.
 DataStream.map([YourRichMapFunction]) this implies that you already need a
 datastream to transform a datastream using a mapFunction
 (MapFunction/RichMapFunction)
 Francis

 On Fri, 7 Jan 2022 at 01:48, Siddhesh Kalgaonkar <
 kalgaonkarsiddh...@gmail.com> wrote:

> Hi,
>
> As I am new and I am facing one issue so I came
> across RichMapFunction. How can I use RichMapFunction to convert a tuple 
> of
> strings to datastream? If not how can I do it apart from using
> StreamExecutionEnvironment?
>
> Thanks,
> Sid
>

 This email and any attachments are proprietary and confidential and are
 intended solely for the use of the individual to whom it is addressed. Any
 views or opinions expressed are solely those of the author and do not
 necessarily reflect or represent those of SwitchDin Pty Ltd. If you have
 received this email in error, please let us know immediately by reply email
 and delete it from your system. You may not use, disseminate, distribute or
 copy this message nor disclose its contents to anyone.
 SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300
 Australia

>>>


Parallelism of Flink SQL LookupTableSource in 1.14 ..

2022-01-10 Thread Jonathan Weaver
I'm attempting to do a proof of concept conversion of a DataStream based
Flink program over to using almost entirely Table SQL.

I have a primary CDC stream (an unbounded scan table source) that does two
joins to LookupTableSource tables and then on to a sink.

In the datastream program the only way to maintain throughput and not get
backpressured on the CDC stream was to set a carefully tuned parallelism on
the lookup functions to maximize the lookup capacity in the source systems.

However in the SQL programs it appears there is no setting I can find to
set a parallelism on the LookupTableSource tables, and the planner is
setting the parallelism to 1 which is only allowing roughly 1/10 the
capacity the source system can handle and backpressuring the CDC stream.

So my question is, is there a way to have the benefits of Table SQL
interface but also allow performance tuning on LookupTableSource tables? A
max parallelism of 1 will kill the attempted conversion immediately.

I love the Catalog interface and am attempting to turn all the custom
functions and lookups into tables that other developers can just write SQL
on.. But the performance tuning is critical.

All the tables are being registered in a catalog using the
DynamicTableSource factories.

My SQL is basically of the form of
INSERT INTO sink
SELECT
   ...
FROM cdc_table cdc
JOIN lookup1 FOR SYSTEM_TIME AS OF cdc.proc_time look1 ON cdc.identifier =
look1.identifier
LEFT OUTER JOIN lookup2 FOR SYSTEM_TIME AS OF cdc.proc_time look2 ON
cdc.identifier = look2.identifier
WHERE conditions;

Any ways to force the planner to a specific parallelism?

Thanks for your time,
Jonathan


Re: Orphaned job files in HDFS

2022-01-10 Thread Yang Wang
IIRC, the staging directory(/user/{name}/.flink/application_xxx) will be
deleted automatically if the Flink job reaches global terminal state(e.g.
FINISHED, CANCELED, FAILED).
So I assume you have stopped the yarn application via "yarn application
-kill", not via "bin/flink cancel".
If it is the case, then having the residual staging directory is an
expected behavior since Flink JobManager does not have a chance to do the
clean-up.



Best,
Yang

David Clutter  于2022年1月11日周二 10:08写道:

> I'm seeing files orphaned in HDFS and wondering how to clean them up when
> the job is completed.  The directory is /user/yarn/.flink so I am assuming
> this is created by flink?  The HDFS in my cluster eventually fills up.
>
> Here is my setup:
>
>- Flink 1.13.1 on AWS EMR
>- Executing flink in per-job mode
>- Job is submitted every 5m
>
> In HDFS under /user/yarn/.flink I see a directory created for every flink
> job submitted/yarn application.  Each application directory contains my
> user jar file, flink-dist jar, /lib with various flink jars,
> log4j.properties.
>
> Is there a property to tell flink to clean up this directory when the job
> is completed?
>


Re: 'Initial segment may not be null' error

2022-01-10 Thread Caizhi Weng
Hi!

This seems like a bug to me. Could you share the whole exception stack (if
the whole stack is not displayed in the terminal, you can search in the
logs) so that others can diagnose this problem?

Egor Ryashin  于2022年1月10日周一 22:54写道:

> Hey,
>
> I use Flink 1.14 and run this query with sql-client:
>
> SET 'sql-client.execution.result-mode' = 'tableau';
> SET 'execution.runtime-mode' = 'batch’;
>
> create table data(
>  id STRING,
>  account_id STRING,
>  a_id STRING,
> `timestamp` STRING
> ) with (
>  'connector'='filesystem',
>  'path'='/Volumes/mobiledisk/data',
>  'format' = 'json'
> );
>
> select count(distinct id) from (select id from data limit 100);`
>
> and get this error almost instantly:
> *[ERROR] Could not execute SQL statement. Reason:*
> *java.lang.NullPointerException: Initial Segment may not be null*
>
> (This query does work in streaming mode though. )
>
> Thanks
>
>


Re: Cannot load user class: avro GenericRecord

2022-01-10 Thread Caizhi Weng
Hi!

Could you share your pom.xml file of your user project? Did you include the
flink-avro dependency? Also did you add the avro format jar to the lib
directory of your Flink distribution?

Jason Politis  于2022年1月11日周二 08:42写道:

> Good evening all,
>
> I'm working on a project for a client.  We are trying to execute Flink SQL
> using Table API in java.
> We are going to pull their data from oracle -> debezium -> kafka -> flink.
>
>
> Here is a sample of our java code:
>
> package carrera.build;
>
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.TableEnvironment;
>
> public class BuildFlinkJob {
> public static void main(String[] args) throws Exception {
> EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
> TableEnvironment tEnv = TableEnvironment.create(settings);
>
> tEnv.executeSql(
> "CREATE TABLE BUILDS (\n" +
> "`PARTITION` INT METADATA FROM 'partition',\n" +
> "`OFFSET` BIGINT METADATA FROM 'offset',\n" +
> "BUILD_ID DOUBLE,\n" +
> "BUILD_NAME STRING,\n" +
> "FACILITY_NUMBER STRING,\n" +
> "START_DATE TIMESTAMP(2),\n" +
> "END_DATE TIMESTAMP(2),\n" +
> "RETAILERDIVISION_NAME STRING,\n" +
> "UPC STRING,\n" +
> "BUILD_INSTRUCTIONS STRING,\n" +
> "WORK_INSTRUCTIONS STRING,\n" +
> "IMAGE_FILE_PATH STRING\n" +
> ") WITH (\n" +
> "'connector' = 'kafka',\n" +
> "'topic' = 
> 'clients-name.OBIANEW_SDS_EBS_12_1_3.BUILDS',\n" +
> "'properties.bootstrap.servers' = 'broker:29092',\n" +
> "'properties.group.id' = 'builds',\n" +
> "'format' = 'debezium-avro-confluent',\n" +
> "'debezium-avro-confluent.url' = 
> 'http://schema-registry:8081',\n" +
> "'scan.startup.mode' = 'earliest-offset'\n" +
> ")"
> );
>
> tEnv.executeSql(
> "CREATE TABLE WC_FK_BUILD_D (\n" +
> "ROW_WID BIGINT,\n" +
> "BUILD_ID DOUBLE,\n" +
> "BUILD_NAME STRING,\n" +
> "FACILITY_NUMBER STRING,\n" +
> "START_DATE TIMESTAMP(0),\n" +
> "END_DATE TIMESTAMP(0),\n" +
> "DIVISION STRING,\n" +
> "UPC STRING,\n" +
> "EFFECTIVE_TO_DT TIMESTAMP(0),\n" +
> "DELETE_FLG STRING,\n" +
> "INTEGRATION_ID STRING,\n" +
> "X_CUSTOM STRING,\n" +
> "PRIMARY KEY (BUILD_ID) NOT ENFORCED\n" +
> ") WITH (\n" +
> "'connector' = 'upsert-kafka',\n" +
> "'topic' = 'WC_FK_BUILD_D',\n" +
> "'properties.bootstrap.servers' = 
> 'broker:29092',\n" +
> "'key.format' = 'avro-confluent',\n" +
> "'key.avro-confluent.url' = 
> 'http://schema-registry:8081',\n" +
> "'value.format' = 'avro-confluent',\n" +
> "'value.avro-confluent.url' = 
> 'http://schema-registry:8081'\n" +
> ")"
> );
>
> Table mapped = tEnv.sqlQuery(
> "SELECT \n" +
> "CAST((CAST((`PARTITION` + 1) as STRING) || '0' 
> || CAST(`OFFSET` as STRING)) as BIGINT),\n" +
> "BUILD_ID,\n" +
> "BUILD_NAME,\n" +
> "FACILITY_NUMBER,\n" +
> "START_DATE,\n" +
> "END_DATE,\n" +
> "RETAILERDIVISION_NAME as DIVISION,\n" +
> "UPC,\n" +
> "TIMESTAMP '3714-01-01 00:00:00' as 
> EFFECTIVE_TO_DT,\n" +
> "'N' as DELETE_FLG,\n" +
> "CAST(BUILD_ID as STRING) as INTEGRATION_ID,\n" +
> "'0' as X_CUSTOM\n" +
> "FROM BUILDS"
> );
>
> mapped.executeInsert("WC_FK_BUILD_D");
> }
> }
>
> These queries work perfectly fine directly in flink SQL client, but when
> trying to submit our jar as a job, we get this error:
>
> 2022-01-10 19:14:56
> org.apache.flink.runtime.JobException: Recovery is suppressed by
> NoRestartBackoffTimeStrategy
> at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java

Orphaned job files in HDFS

2022-01-10 Thread David Clutter
I'm seeing files orphaned in HDFS and wondering how to clean them up when
the job is completed.  The directory is /user/yarn/.flink so I am assuming
this is created by flink?  The HDFS in my cluster eventually fills up.

Here is my setup:

   - Flink 1.13.1 on AWS EMR
   - Executing flink in per-job mode
   - Job is submitted every 5m

In HDFS under /user/yarn/.flink I see a directory created for every flink
job submitted/yarn application.  Each application directory contains my
user jar file, flink-dist jar, /lib with various flink jars,
log4j.properties.

Is there a property to tell flink to clean up this directory when the job
is completed?


Cannot load user class: avro GenericRecord

2022-01-10 Thread Jason Politis
Good evening all,

I'm working on a project for a client.  We are trying to execute Flink SQL
using Table API in java.
We are going to pull their data from oracle -> debezium -> kafka -> flink.


Here is a sample of our java code:

package carrera.build;

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;

public class BuildFlinkJob {
public static void main(String[] args) throws Exception {
EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
TableEnvironment tEnv = TableEnvironment.create(settings);

tEnv.executeSql(
"CREATE TABLE BUILDS (\n" +
"`PARTITION` INT METADATA FROM 'partition',\n" +
"`OFFSET` BIGINT METADATA FROM 'offset',\n" +
"BUILD_ID DOUBLE,\n" +
"BUILD_NAME STRING,\n" +
"FACILITY_NUMBER STRING,\n" +
"START_DATE TIMESTAMP(2),\n" +
"END_DATE TIMESTAMP(2),\n" +
"RETAILERDIVISION_NAME STRING,\n" +
"UPC STRING,\n" +
"BUILD_INSTRUCTIONS STRING,\n" +
"WORK_INSTRUCTIONS STRING,\n" +
"IMAGE_FILE_PATH STRING\n" +
") WITH (\n" +
"'connector' = 'kafka',\n" +
"'topic' =
'clients-name.OBIANEW_SDS_EBS_12_1_3.BUILDS',\n" +
"'properties.bootstrap.servers' = 'broker:29092',\n" +
"'properties.group.id' = 'builds',\n" +
"'format' = 'debezium-avro-confluent',\n" +
"'debezium-avro-confluent.url' =
'http://schema-registry:8081',\n" +
"'scan.startup.mode' = 'earliest-offset'\n" +
")"
);

tEnv.executeSql(
"CREATE TABLE WC_FK_BUILD_D (\n" +
"ROW_WID BIGINT,\n" +
"BUILD_ID DOUBLE,\n" +
"BUILD_NAME STRING,\n" +
"FACILITY_NUMBER STRING,\n" +
"START_DATE TIMESTAMP(0),\n" +
"END_DATE TIMESTAMP(0),\n" +
"DIVISION STRING,\n" +
"UPC STRING,\n" +
"EFFECTIVE_TO_DT TIMESTAMP(0),\n" +
"DELETE_FLG STRING,\n" +
"INTEGRATION_ID STRING,\n" +
"X_CUSTOM STRING,\n" +
"PRIMARY KEY (BUILD_ID) NOT ENFORCED\n" +
") WITH (\n" +
"'connector' = 'upsert-kafka',\n" +
"'topic' = 'WC_FK_BUILD_D',\n" +
"'properties.bootstrap.servers' =
'broker:29092',\n" +
"'key.format' = 'avro-confluent',\n" +
"'key.avro-confluent.url' =
'http://schema-registry:8081',\n" +
"'value.format' = 'avro-confluent',\n" +
"'value.avro-confluent.url' =
'http://schema-registry:8081'\n" +
")"
);

Table mapped = tEnv.sqlQuery(
"SELECT \n" +
"CAST((CAST((`PARTITION` + 1) as STRING)
|| '0' || CAST(`OFFSET` as STRING)) as BIGINT),\n" +
"BUILD_ID,\n" +
"BUILD_NAME,\n" +
"FACILITY_NUMBER,\n" +
"START_DATE,\n" +
"END_DATE,\n" +
"RETAILERDIVISION_NAME as DIVISION,\n" +
"UPC,\n" +
"TIMESTAMP '3714-01-01 00:00:00' as
EFFECTIVE_TO_DT,\n" +
"'N' as DELETE_FLG,\n" +
"CAST(BUILD_ID as STRING) as INTEGRATION_ID,\n" +
"'0' as X_CUSTOM\n" +
"FROM BUILDS"
);

mapped.executeInsert("WC_FK_BUILD_D");
}
}

These queries work perfectly fine directly in flink SQL client, but when
trying to submit our jar as a job, we get this error:

2022-01-10 19:14:56
org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy
at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:228)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:218)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(

Re: pyflink mixed with Java operators

2022-01-10 Thread Francis Conroy
Thanks for this Dian. I'll give that a try.

On Mon, 10 Jan 2022 at 22:51, Dian Fu  wrote:

> Hi,
>
> You could try the following method:
>
> ```
> from pyflink.java_gateway import get_gateway
>
> jvm = get_gateway().jvm
> ds = (
> DataStream(ds._j_data_stream.map(jvm.com.example.MyJavaMapFunction()))
> )
> ```
>
> Regards,
> Dian
>
> On Fri, Jan 7, 2022 at 1:00 PM Francis Conroy <
> francis.con...@switchdin.com> wrote:
>
>> Hi all,
>>
>> Does anyone know if it's possible to specify a java map function at some
>> intermediate point in a pyflink job? In this case
>>
>> SimpleCountMeasurementsPerUUID
>>
>> is a flink java MapFunction. The reason we want to do this is that
>> performance in pyflink seems quite poor.
>> e.g.
>>
>> import logging
>> import os
>> import sys
>> import zlib
>>
>> import Measurements_pb2
>> from pyflink.common import Types
>> from pyflink.common.serialization import 
>> KafkaRecordSerializationSchemaBuilder, SimpleStringSchema
>> from pyflink.datastream import StreamExecutionEnvironment, 
>> RuntimeExecutionMode, MapFunction, RuntimeContext, \
>> CheckpointingMode
>> from pyflink.datastream.connectors import RMQConnectionConfig, RMQSource, 
>> KafkaSink
>>
>> from functions.common import KeyByUUID
>> from functions.file_lister import auto_load_python_files
>> from customisations.serialisation import ZlibDeserializationSchema
>>
>>
>> class ZlibDecompressor(MapFunction):
>> def map(self, value):
>> decomp = zlib.decompress(value[1])
>> return value[0], decomp
>>
>>
>> class MeasurementSnapshotCountMapFunction(MapFunction):
>> def map(self, value):
>> pb_body = Measurements_pb2.MeasurementSnapshot()
>> pb_body.ParseFromString(value)
>> meas_count = len(pb_body.measurements)
>> if meas_count > 0:
>> first_measurement = pb_body.measurements[0]
>> point_uuid = first_measurement.point_uuid.value
>> timestamp = first_measurement.time
>>
>> return timestamp, point_uuid, meas_count
>>
>> return None
>>
>>
>> def word_count():
>> env = StreamExecutionEnvironment.get_execution_environment()
>> jarpath = 
>> f"file://{os.getcwd()}/../src-java/switchdin-flink-serialization/target/serialization-1.0-SNAPSHOT.jar"
>> env.add_jars(jarpath)
>> auto_load_python_files(env)
>> env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
>> # write all the data to one file
>> env.set_parallelism(1)
>> env.enable_checkpointing(1000, CheckpointingMode.AT_LEAST_ONCE)
>>
>> connection_config = RMQConnectionConfig.Builder() \
>> .set_host("rabbitmq") \
>> .set_port(5672) \
>> .set_virtual_host("/") \
>> .set_user_name("guest") \
>> .set_password("guest") \
>> .set_connection_timeout(60) \
>> .set_prefetch_count(5000) \
>> .build()
>>
>> deserialization_schema = ZlibDeserializationSchema()
>>
>> stream = env.add_source(RMQSource(
>> connection_config,
>> "flink-test",
>> False,
>> deserialization_schema,
>> )).set_parallelism(1)
>>
>> # compute word count
>> dstream = 
>> stream.map(MeasurementSnapshotCountMapFunction()).uid("DecompressRMQData") \
>> .key_by(KeyByUUID(), key_type=Types.STRING()) \
>> .jMap("org.switchdin.operators.SimpleCountMeasurementsPerUUID")  # 
>> Hypothetical
>>
>> kafka_serialisation_schema = KafkaRecordSerializationSchemaBuilder() \
>> .set_value_serialization_schema(SimpleStringSchema()) \
>> .set_topic("flink-test-kafka") \
>> .build()
>>
>> dstream.sink_to(
>> KafkaSink.builder() \
>> .set_record_serializer(kafka_serialisation_schema) \
>> .set_bootstrap_servers("kafka:9092") \
>> .build()
>> )
>>
>> # submit for execution
>> env.execute()
>>
>>
>> if __name__ == '__main__':
>> logging.basicConfig(stream=sys.stdout, level=logging.INFO, 
>> format="%(message)s")
>> word_count()
>>
>>
>>
>> This email and any attachments are proprietary and confidential and are
>> intended solely for the use of the individual to whom it is addressed. Any
>> views or opinions expressed are solely those of the author and do not
>> necessarily reflect or represent those of SwitchDin Pty Ltd. If you have
>> received this email in error, please let us know immediately by reply email
>> and delete it from your system. You may not use, disseminate, distribute or
>> copy this message nor disclose its contents to anyone.
>> SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300
>> Australia
>>
>

-- 
This email and any attachments are proprietary and confidential and are 
intended solely for the use of the individual to whom it is addressed. Any 
views or opinions expressed are solely those of the author and do not 
necessarily reflect or represent those of SwitchDin Pty Ltd. If you have 
received this email in error, please 

Re: Avro BulkFormat for the new FileSource API?

2022-01-10 Thread Kevin Lam
Hi David,

Awesome, wasn't aware of FLINK-24565. That's the kind of thing we were
looking for and will take a look at it. Thanks for sharing that!



On Fri, Jan 7, 2022 at 2:05 PM David Morávek 
wrote:

> Hi Kevin,
>
> I'm not as familiar with initiatives around the new sources, but it seems
> that the BulkFormat for Avro [1] has been added recently and will be
> released with the Flink 1.15.x.
>
> [1] https://issues.apache.org/jira/browse/FLINK-24565
>
> Best,
> D.
>
> On Fri, Jan 7, 2022 at 7:23 PM Kevin Lam  wrote:
>
>> Hi all,
>>
>> We're looking into using the new FileSource
>> 
>> API, we see that there is a BulkFormat
>> 
>> for Parquet, via ParquetColumnarRowFormat
>> .
>> Is there similar BulkFormat available or in the works for Avro files?
>>
>> I imagined it may be a common use-case in the community so wanted to
>> check here before we invest time implementing our own.
>>
>> Thanks in advance!
>>
>


Re: Serving Machine Learning models

2022-01-10 Thread David Anderson
Another approach that I find quite natural is to use Flink's Stateful
Functions API [1] for model serving, and this has some nice advantages,
such as zero-downtime deployments of new models, and the ease with which
you can use Python. [2] is an example of this approach.

[1] https://flink.apache.org/stateful-functions.html
[2] https://github.com/ververica/flink-statefun-workshop

On Fri, Jan 7, 2022 at 5:55 PM Yun Gao  wrote:

> Hi Sonia,
>
> Sorry I might not have the statistics on the provided two methods, perhaps
> as input
> I could also provide another method: currently there is an eco-project
> dl-on-flink
> that supports running DL frameworks on top of the Flink and it will handle
> the data
> exchange between java and python processes, which would allows to user the
> native
> model directly.
>
> Best,
> Yun
>
>
> [1] https://github.com/flink-extended/dl-on-flink
>
>
>
> --
> From:Sonia-Florina Horchidan 
> Send Time:2022 Jan. 7 (Fri.) 17:23
> To:user@flink.apache.org 
> Subject:Serving Machine Learning models
>
> Hello,
>
>
> I recently started looking into serving Machine Learning models for
> streaming data in Flink. To give more context, that would involve training
> a model offline (using PyTorch or TensorFlow), and calling it from inside a
> Flink job to do online inference on newly arrived data. I have found
> multiple discussions, presentations, and tools that could achieve this, and
> it seems like the two alternatives would be: (1) wrap the pre-trained
> models in a HTTP service (such as PyTorch Serve [1]) and let Flink do async
> calls for model scoring, or (2) convert the models into a standardized
> format (e.g., ONNX [2]), pre-load the model in memory for every task
> manager (or use external storage if needed) and call it for each new data
> point.
>
> Both approaches come with a set of advantages and drawbacks and, as far as
> I understand, there is no "silver bullet", since one approach could be more
> suitable than the other based on the application requirements. However, I
> would be curious to know what would be the "recommended" methods for model
> serving (if any) and what approaches are currently adopted by the users in
> the wild.
>
> [1] https://pytorch.org/serve/
>
> [2] https://onnx.ai/
>
> Best regards,
>
> Sonia
>
>
>  [image: Kth Logo]
>
> Sonia-Florina Horchidan
> PhD Student
> KTH Royal Institute of Technology
> *Software and Computer Systems (SCS)*
> School of Electrical Engineering and Computer Science (EECS)
> Mobil: +46769751562
> sf...@kth.se,  www.kth.se
>
>
>


Re: RichMapFunction to convert tuple of strings to DataStream[(String,String)]

2022-01-10 Thread Siddhesh Kalgaonkar
Hi Piotr,

Thanks for the reply. I was looking for how to create a DataStream under a
process function since using that I had to call something else but I came
across one of Fabian's posts where he mentioned that this way of creating
DS is not "encouraged and tested". So, I figured out an alternate way of
using side output and now I can do what I was aiming for.

Thanks,
Sid.

On Mon, Jan 10, 2022 at 5:29 PM Piotr Nowojski  wrote:

> Hi Sid,
>
> I don't see on the stackoverflow explanation of what are you trying to do
> here (no mentions of MapFunction or a tuple).
>
> If you want to create a `DataStream` from some a pre
> existing/static Tuple of Strings, the easiest thing would be to convert the
> tuple to a collection/iterator and use
> `StreamExecutionEnvironment#fromCollection(...)`.
> If you already have a `DataStream>` (for example your source
> produces a tuple) and you want to flatten it to `DataStream`, then
> you need a simple `FlatMapFunction, String>` (or
> `RichFlatMapFunction, String>`), that would do the flattening
> via:
>
> public void flatMap(Tuple value, Collector out) throws
> Exception {
>   out.collect(value.f0);
>   out.collect(value.f1);
>   ...;
>   out.collect(value.fN);
> }
>
> Best,
> Piotrek
>
> pt., 7 sty 2022 o 07:05 Siddhesh Kalgaonkar 
> napisał(a):
>
>> Hi Francis,
>>
>> What I am trying to do is you can see over here
>> https://stackoverflow.com/questions/70592174/richsinkfunction-for-cassandra-in-flink/70593375?noredirect=1#comment124796734_70593375
>>
>>
>> On Fri, Jan 7, 2022 at 5:07 AM Francis Conroy <
>> francis.con...@switchdin.com> wrote:
>>
>>> Hi Siddhesh,
>>>
>>> How are you getting this tuple of strings into the system? I think this
>>> is the important question, you can create a DataStream in many ways, from a
>>> collection, from a source, etc but all of these rely on the
>>> ExecutionEnvironment you're using.
>>> A RichMapFunction doesn't produce a datastream directly, it's used in
>>> the context of the StreamExecutionEnvironment to create a stream i.e.
>>> DataStream.map([YourRichMapFunction]) this implies that you already need a
>>> datastream to transform a datastream using a mapFunction
>>> (MapFunction/RichMapFunction)
>>> Francis
>>>
>>> On Fri, 7 Jan 2022 at 01:48, Siddhesh Kalgaonkar <
>>> kalgaonkarsiddh...@gmail.com> wrote:
>>>
 Hi,

 As I am new and I am facing one issue so I came across RichMapFunction.
 How can I use RichMapFunction to convert a tuple of strings to datastream?
 If not how can I do it apart from using StreamExecutionEnvironment?

 Thanks,
 Sid

>>>
>>> This email and any attachments are proprietary and confidential and are
>>> intended solely for the use of the individual to whom it is addressed. Any
>>> views or opinions expressed are solely those of the author and do not
>>> necessarily reflect or represent those of SwitchDin Pty Ltd. If you have
>>> received this email in error, please let us know immediately by reply email
>>> and delete it from your system. You may not use, disseminate, distribute or
>>> copy this message nor disclose its contents to anyone.
>>> SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300
>>> Australia
>>>
>>


Re: question about Statefun/Flink version compatibility

2022-01-10 Thread Igal Shilman
Hello Galen,
StateFun is using some internal APIs so they might or might not stay
compatible between versions. You can try bump the version
If it compiles cleanly, most likely this would work.
We will be porting the main branch to Flink 1.14 this or next week.

Cheers,
Igal.

On Mon, Jan 10, 2022 at 12:31 AM Galen Warren 
wrote:

> The statefun project currently references Flink 1.13.2. Is 1.13.2 the
> *minimum *Flink version required, i.e. would Statefun be expected to work
> with Flink 1.14.2 or whatever the latest released version is, as well? Or
> is it important to use exactly version 1.13.2 of Flink? Thanks.
>


Re: Uploading jar to s3 for persistence

2022-01-10 Thread David Morávek
I understand the issue.

We currently don't have a good mechanism for this kind of external file
management (we need to avoid leaking resources) :( Even right now, we kind
of rely on upload directory being cleaned up by the cluster manager (yarn,
k8s), because it's tied with a container lifecycle.

I could imagine something along the lines of storing this into blob server,
but this would require additional metadata that we'd need to store into HA
services which is something we want to avoid in general and it would add
additional complexity.

Chesnay's proposal seems reasonable as this would make the REST API behave
more consistently with the CLI.

D.

On Mon, Jan 10, 2022 at 5:34 PM Puneet Duggal 
wrote:

> Hi,
>
> Ignore above reply. Got your point. Just one doubt. So is using 
> java.nio.file.FileSystem
> an expectation instead of Flink’s org.apache.flink.core.fs.FileSystem. I
> mean can we raise it as an issue to use flink filesystem instead as it
> allows us to use distributed filesystem as persistent storage of jars and
> hence can be accessible to all the job managers in HA setup.
>
> Currently since web.upload.dir can only assume value of local filesystem
> dir, there are cases where  job submission request is raised to other Job
> manager and hence leads to jar not found issue.
> Another use case that it will solve is that when job manager goes
> down/restarted, currently it deletes the dir containing all the jars. Use
> persistent storage will also help in this case.
>
> Best,
> Puneet Duggal
>
> On 10-Jan-2022, at 9:40 PM, Puneet Duggal 
> wrote:
>
> Hi Piotr,
>
> Thank you for your immediate reply. I went through this thread and it was
> also mentioned that flink required s3-filesystem related jars which are
> present in my HA flink cluster. Also as mentioned in Apache Flink
> Documentation for Amazon S3 integration ,
>
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/
>
> It is mentioned that we can use s3 in all locations where Flink expects
> Filesystem URI. (including high availability and RocksDB State Backend).
>
> Regards,
> Puneet Duggal
>
> On 10-Jan-2022, at 7:13 PM, Piotr Nowojski  wrote:
>
> Hi Puneet,
>
> Have you seen this thread before? [1]. It looks like the same issue and
> especially this part might be the key:
>
> > Be aware that the filesystem used by the FileUploadHandler
> > is java.nio.file.FileSystem and not
> > Flink's org.apache.flink.core.fs.FileSystem for which we provide
> different
> > FileSystem implementations.
>
> Best,
> Piotrek
>
> [1] https://www.mail-archive.com/user@flink.apache.org/msg38043.html
>
>
>
> pon., 10 sty 2022 o 08:19 Puneet Duggal 
> napisał(a):
>
>> Hi,
>>
>> Currently i am working with flink HA cluster with 3 job managers and 3
>> zookeeper nodes. Also i am persisting my checkpoints to s3 and hence
>> already configured required flink-s3 jars during flink job manager and task
>> manager process startup. Now i have configured a variable
>>
>> web.upload.dir: s3p://d11-flink-job-manager-load/jars
>>
>> Expectation is that jar upload via rest apis will be uploaded to this
>> location and hence is accessible to all 3 job managers (which eventually
>> will help in job submission as all 3 job managers will have record of
>> uploaded jar to this location). But while uploading the jar, I am facing
>> following Illegal Argument Exception which i am not sure why. Also above
>> provided s3 location was created before job manager process was even
>> started.
>>
>> *2022-01-09 18:12:46,790 WARN
>>  org.apache.flink.runtime.rest.FileUploadHandler  [] - File
>> upload failed.*
>> *java.lang.IllegalArgumentException: UploadDirectory is not absolute.*
>> at
>> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
>> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>> at
>> org.apache.flink.runtime.rest.handler.FileUploads.(FileUploads.java:59)
>> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>> at
>> org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:186)
>> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>> at
>> org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:69)
>> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
>> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>> at
>> org.apache.flink.shaded.nett

Re: Custom Kafka Keystore on Amazon Kinesis Analytics

2022-01-10 Thread Piotr Nowojski
Ah, I see. Pitty. You could always use reflection if you really had to, but
that's of course not a long term solution.

I will raise this issue to the KafkaSource/AWS contributors.

Best,
Piotr Nowojski

pon., 10 sty 2022 o 16:55 Clayton Wohl  napisał(a):

> Custom code can create subclasses of FlinkKafkaConsumer, because the
> constructors are public. Custom code can't create subclasses of KafkaSource
> because the constructors are package private. So the same solution of
> creating code subclasses won't work for KafkaSource.
>
> Thank you for the response :)
>
>
> On Mon, Jan 10, 2022 at 6:22 AM Piotr Nowojski 
> wrote:
>
>> Hi Clayton,
>>
>> I think in principle this example should be still valid, however instead
>> of providing a `CustomFlinkKafkaConsumer` and overriding it's `open`
>> method, you would probably need to override
>> `org.apache.flink.connector.kafka.source.reader.KafkaSourceReader#start`.
>> So you would most likely need both at the very least a custom
>> `KafkaSourceReader` and `KafkaSource` to instantiate your custom
>> `KafkaSourceReader`. But I'm not sure if anyone has ever tried this so far.
>>
>> Best,
>> Piotrek
>>
>> pt., 7 sty 2022 o 21:18 Clayton Wohl  napisał(a):
>>
>>> If I want to migrate from FlinkKafkaConsumer to KafkaSource, does the
>>> latter support this:
>>>
>>>
>>> https://docs.aws.amazon.com/kinesisanalytics/latest/java/example-keystore.html
>>>
>>> Basically, I'm running my Flink app in Amazon's Kinesis Analytics hosted
>>> Flink environment. I don't have reliable access to the local file system.
>>> At the documentation link above, Amazon recommends adding a hook to copy
>>> the keystore files from the classpath to a /tmp directory at runtime. Can
>>> KafkaSource do something similar?
>>>
>>


Re: Uploading jar to s3 for persistence

2022-01-10 Thread David Morávek
Hi Puneet,

this is a known limitation and unfortunately `web.upload.dir` currently
works only with the local system :( There are multiple issues covering this
already, I guess FLINK-16544 [1] summarizes the current state well.

This is something we want to address with the future releases. We've
briefly discussed it with Chesnay and the most straightforward / solid
approach would be making the "rest submission" stateless (having a single
endpoint that takes the jar and runs it right away).

We'll also try to make the documentation more explicit in terms of which
"file-based" config options only work with the local filesystem for the
1.15 release.

[1] https://issues.apache.org/jira/browse/FLINK-16544

Best,
D.

On Mon, Jan 10, 2022 at 5:11 PM Puneet Duggal 
wrote:

> Hi Piotr,
>
> Thank you for your immediate reply. I went through this thread and it was
> also mentioned that flink required s3-filesystem related jars which are
> present in my HA flink cluster. Also as mentioned in Apache Flink
> Documentation for Amazon S3 integration ,
>
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/
>
> It is mentioned that we can use s3 in all locations where Flink expects
> Filesystem URI. (including high availability and RocksDB State Backend).
>
> Regards,
> Puneet Duggal
>
> On 10-Jan-2022, at 7:13 PM, Piotr Nowojski  wrote:
>
> Hi Puneet,
>
> Have you seen this thread before? [1]. It looks like the same issue and
> especially this part might be the key:
>
> > Be aware that the filesystem used by the FileUploadHandler
> > is java.nio.file.FileSystem and not
> > Flink's org.apache.flink.core.fs.FileSystem for which we provide
> different
> > FileSystem implementations.
>
> Best,
> Piotrek
>
> [1] https://www.mail-archive.com/user@flink.apache.org/msg38043.html
>
>
>
> pon., 10 sty 2022 o 08:19 Puneet Duggal 
> napisał(a):
>
>> Hi,
>>
>> Currently i am working with flink HA cluster with 3 job managers and 3
>> zookeeper nodes. Also i am persisting my checkpoints to s3 and hence
>> already configured required flink-s3 jars during flink job manager and task
>> manager process startup. Now i have configured a variable
>>
>> web.upload.dir: s3p://d11-flink-job-manager-load/jars
>>
>> Expectation is that jar upload via rest apis will be uploaded to this
>> location and hence is accessible to all 3 job managers (which eventually
>> will help in job submission as all 3 job managers will have record of
>> uploaded jar to this location). But while uploading the jar, I am facing
>> following Illegal Argument Exception which i am not sure why. Also above
>> provided s3 location was created before job manager process was even
>> started.
>>
>> *2022-01-09 18:12:46,790 WARN
>>  org.apache.flink.runtime.rest.FileUploadHandler  [] - File
>> upload failed.*
>> *java.lang.IllegalArgumentException: UploadDirectory is not absolute.*
>> at
>> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
>> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>> at
>> org.apache.flink.runtime.rest.handler.FileUploads.(FileUploads.java:59)
>> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>> at
>> org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:186)
>> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>> at
>> org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:69)
>> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
>> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
>> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>>
>>
>>
>>
>


Re: Uploading jar to s3 for persistence

2022-01-10 Thread Puneet Duggal
Hi,

Ignore above reply. Got your point. Just one doubt. So is using 
java.nio.file.FileSystem an expectation instead of Flink’s 
org.apache.flink.core.fs.FileSystem. I mean can we raise it as an issue to use 
flink filesystem instead as it allows us to use distributed filesystem as 
persistent storage of jars and hence can be accessible to all the job managers 
in HA setup.

Currently since web.upload.dir can only assume value of local filesystem dir, 
there are cases where  job submission request is raised to other Job manager 
and hence leads to jar not found issue. 
Another use case that it will solve is that when job manager goes 
down/restarted, currently it deletes the dir containing all the jars. Use 
persistent storage will also help in this case.

Best,
Puneet Duggal

> On 10-Jan-2022, at 9:40 PM, Puneet Duggal  wrote:
> 
> Hi Piotr,
> 
> Thank you for your immediate reply. I went through this thread and it was 
> also mentioned that flink required s3-filesystem related jars which are 
> present in my HA flink cluster. Also as mentioned in Apache Flink 
> Documentation for Amazon S3 integration ,
> 
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/
>  
> 
> 
> It is mentioned that we can use s3 in all locations where Flink expects 
> Filesystem URI. (including high availability and RocksDB State Backend).
> 
> Regards, 
> Puneet Duggal
> 
>> On 10-Jan-2022, at 7:13 PM, Piotr Nowojski > > wrote:
>> 
>> Hi Puneet,
>> 
>> Have you seen this thread before? [1]. It looks like the same issue and 
>> especially this part might be the key:
>> 
>> > Be aware that the filesystem used by the FileUploadHandler
>> > is java.nio.file.FileSystem and not
>> > Flink's org.apache.flink.core.fs.FileSystem for which we provide different
>> > FileSystem implementations.
>> 
>> Best,
>> Piotrek
>> 
>> [1] https://www.mail-archive.com/user@flink.apache.org/msg38043.html 
>> 
>> 
>> 
>> 
>> pon., 10 sty 2022 o 08:19 Puneet Duggal > > napisał(a):
>> Hi, 
>> 
>> Currently i am working with flink HA cluster with 3 job managers and 3 
>> zookeeper nodes. Also i am persisting my checkpoints to s3 and hence already 
>> configured required flink-s3 jars during flink job manager and task manager 
>> process startup. Now i have configured a variable 
>> 
>> web.upload.dir: s3p://d11-flink-job-manager-load/jars <>
>> 
>> Expectation is that jar upload via rest apis will be uploaded to this 
>> location and hence is accessible to all 3 job managers (which eventually 
>> will help in job submission as all 3 job managers will have record of 
>> uploaded jar to this location). But while uploading the jar, I am facing 
>> following Illegal Argument Exception which i am not sure why. Also above 
>> provided s3 location was created before job manager process was even started.
>> 
>> 2022-01-09 18:12:46,790 WARN  
>> org.apache.flink.runtime.rest.FileUploadHandler  [] - File 
>> upload failed.
>> java.lang.IllegalArgumentException: UploadDirectory is not absolute.
>>  at 
>> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138) 
>> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>>  at 
>> org.apache.flink.runtime.rest.handler.FileUploads.(FileUploads.java:59)
>>  ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>>  at 
>> org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:186)
>>  ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>>  at 
>> org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:69)
>>  ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>>  at 
>> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
>>  ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>>  at 
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>>  ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>>  at 
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>>  ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>>  at 
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>>  ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>>  at 
>> org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
>>  ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>> 
>> 
>> 
> 



Re: Uploading jar to s3 for persistence

2022-01-10 Thread Puneet Duggal
Hi Piotr,

Thank you for your immediate reply. I went through this thread and it was also 
mentioned that flink required s3-filesystem related jars which are present in 
my HA flink cluster. Also as mentioned in Apache Flink Documentation for Amazon 
S3 integration ,

https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/s3/

It is mentioned that we can use s3 in all locations where Flink expects 
Filesystem URI. (including high availability and RocksDB State Backend).

Regards, 
Puneet Duggal

> On 10-Jan-2022, at 7:13 PM, Piotr Nowojski  wrote:
> 
> Hi Puneet,
> 
> Have you seen this thread before? [1]. It looks like the same issue and 
> especially this part might be the key:
> 
> > Be aware that the filesystem used by the FileUploadHandler
> > is java.nio.file.FileSystem and not
> > Flink's org.apache.flink.core.fs.FileSystem for which we provide different
> > FileSystem implementations.
> 
> Best,
> Piotrek
> 
> [1] https://www.mail-archive.com/user@flink.apache.org/msg38043.html 
> 
> 
> 
> 
> pon., 10 sty 2022 o 08:19 Puneet Duggal  > napisał(a):
> Hi, 
> 
> Currently i am working with flink HA cluster with 3 job managers and 3 
> zookeeper nodes. Also i am persisting my checkpoints to s3 and hence already 
> configured required flink-s3 jars during flink job manager and task manager 
> process startup. Now i have configured a variable 
> 
> web.upload.dir: s3p://d11-flink-job-manager-load/jars <>
> 
> Expectation is that jar upload via rest apis will be uploaded to this 
> location and hence is accessible to all 3 job managers (which eventually will 
> help in job submission as all 3 job managers will have record of uploaded jar 
> to this location). But while uploading the jar, I am facing following Illegal 
> Argument Exception which i am not sure why. Also above provided s3 location 
> was created before job manager process was even started.
> 
> 2022-01-09 18:12:46,790 WARN  org.apache.flink.runtime.rest.FileUploadHandler 
>  [] - File upload failed.
> java.lang.IllegalArgumentException: UploadDirectory is not absolute.
>   at 
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138) 
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>   at 
> org.apache.flink.runtime.rest.handler.FileUploads.(FileUploads.java:59) 
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>   at 
> org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:186)
>  ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>   at 
> org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:69)
>  ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
>  ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>  ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>  ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>  ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
>  ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> 
> 
> 



Re: Custom Kafka Keystore on Amazon Kinesis Analytics

2022-01-10 Thread Clayton Wohl
Custom code can create subclasses of FlinkKafkaConsumer, because the
constructors are public. Custom code can't create subclasses of KafkaSource
because the constructors are package private. So the same solution of
creating code subclasses won't work for KafkaSource.

Thank you for the response :)


On Mon, Jan 10, 2022 at 6:22 AM Piotr Nowojski  wrote:

> Hi Clayton,
>
> I think in principle this example should be still valid, however instead
> of providing a `CustomFlinkKafkaConsumer` and overriding it's `open`
> method, you would probably need to override
> `org.apache.flink.connector.kafka.source.reader.KafkaSourceReader#start`.
> So you would most likely need both at the very least a custom
> `KafkaSourceReader` and `KafkaSource` to instantiate your custom
> `KafkaSourceReader`. But I'm not sure if anyone has ever tried this so far.
>
> Best,
> Piotrek
>
> pt., 7 sty 2022 o 21:18 Clayton Wohl  napisał(a):
>
>> If I want to migrate from FlinkKafkaConsumer to KafkaSource, does the
>> latter support this:
>>
>>
>> https://docs.aws.amazon.com/kinesisanalytics/latest/java/example-keystore.html
>>
>> Basically, I'm running my Flink app in Amazon's Kinesis Analytics hosted
>> Flink environment. I don't have reliable access to the local file system.
>> At the documentation link above, Amazon recommends adding a hook to copy
>> the keystore files from the classpath to a /tmp directory at runtime. Can
>> KafkaSource do something similar?
>>
>


Re: Plans to update StreamExecutionEnvironment.readFiles to use the FLIP-27 compatible FileSource?

2022-01-10 Thread Kevin Lam
Hi Fabian,

Thanks for creating and sharing that ticket. I noticed the clause "The
FileSource can already read the state of the previous version", a little
off-topic from the original topic of this thread but I was wondering if you
could elaborate on that. Can the new FileSource interoperate with the old
.readFile operator state? Is there a smooth way to upgrade to the new
FileSource API from the old one without losing state?

Thanks!

On Mon, Jan 10, 2022 at 7:20 AM Fabian Paul  wrote:

> Hi Kevin,
>
> I created a ticket to track the effort [1]. Unfortunately, we are
> already in the last few weeks of the release cycle for 1.15 so I
> cannot guarantee that someone can implement it until then.
>
> Best,
> Fabian
>
> [1] https://issues.apache.org/jira/browse/FLINK-25591
>
> On Fri, Jan 7, 2022 at 5:07 PM Kevin Lam  wrote:
> >
> > Hi all,
> >
> > Are there any plans to update StreamExecutionEnvironment.readFiles to
> use the new FLIP-27 compatible FileSource?
> >
> > readFiles supports some features via it's FileInputFormat like
> setNestedFileEnumeration and setFilesFilter that we'd be interested in
> continuing to use but it seems FileSource doesn't have that.
>


'Initial segment may not be null' error

2022-01-10 Thread Egor Ryashin
Hey,

I use Flink 1.14 and run this query with sql-client:

SET 'sql-client.execution.result-mode' = 'tableau';
SET 'execution.runtime-mode' = 'batch’; 

create table data(
 id STRING,
 account_id STRING,
 a_id STRING,
`timestamp` STRING
) with (
 'connector'='filesystem',
 'path'='/Volumes/mobiledisk/data',
 'format' = 'json'
);

select count(distinct id) from (select id from data limit 100);`

and get this error almost instantly:
[ERROR] Could not execute SQL statement. Reason:
java.lang.NullPointerException: Initial Segment may not be null

(This query does work in streaming mode though. )

Thanks



Re: RowType for complex types in Parquet File

2022-01-10 Thread Krzysztof Chmielewski
Hi,
Isn't this actually already implemented and planed for version 1.15?
https://issues.apache.org/jira/browse/FLINK-17782

Regards,
Krzysztof Chmielewski

pt., 7 sty 2022 o 16:20 Jing Ge  napisał(a):

> Hi Meghajit,
>
> like the exception described, parquet schema with nested columns is not
> supported currently. It is on our todo list with high priority.
>
> Best regards
> Jing
>
> On Fri, Jan 7, 2022 at 6:12 AM Meghajit Mazumdar <
> meghajit.mazum...@gojek.com> wrote:
>
>> Hello,
>>
>> Flink documentation mentions this
>> 
>> as to how to create a FileSource for reading Parquet files.
>> For primitive parquet types like BINARY and BOOLEAN, I am able to create
>> a RowType and read the fields.
>>
>> However, I have some nested fields in my parquet schema also like this
>> which I want to read :
>>
>>   optional group location = 11 {
>> optional double latitude = 1;
>> optional double longitude = 2;
>>   }
>>
>> How can I create a RowType for this ? I did something like this below,
>> but I got an exception `Caused by:
>> java.lang.UnsupportedOperationException: Complex types not supported`
>>
>> RowType nestedRowType = RowType.of(new LogicalType[] {new
>> DoubleType(), new DoubleType()}, new String[]{"latitude", "longitude"});
>> final LogicalType[] fieldTypes = new
>> LogicalType[]{nestedRowType};
>> final ParquetColumnarRowInputFormat format =
>> new ParquetColumnarRowInputFormat<>(
>> new Configuration(),
>> RowType.of(fieldTypes, new
>> String[]{"location"}),
>> 500,
>> false,
>> true);
>>
>


Re: Is there a way to know how long a Flink app takes to finish resuming from Savepoint?

2022-01-10 Thread Piotr Nowojski
Hi,

Unfortunately there is no such metric. Regarding the logs, I'm not sure
what Flink version you are using, but since Flink 1.13.0 [1][2], you could
relay on the tasks/subtasks switch from `INITIALIZING` to `RUNNING` to
check when the task/subtask has finished recovering it's state.

Best,
Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-17012
[2] https://issues.apache.org/jira/browse/FLINK-22215

pon., 10 sty 2022 o 09:34 Chen-Che Huang  napisał(a):

> Hi all,
>
> I'm trying to speed up the process of resuming from a savepoint by
> adjusting some configuration.
> I wonder whether there exists a way to know how much time our Flink app
> spends resuming from a savepoint?
> From the logs, I can see only the starting time of the resuming (as shown
> below) but couldn't find the end time of the resuming.
> If there exists some metrics or information about the resuming time, it'd
> be very helpful for the tuning.
> Any comment is appreciated.
>
> timestamp-1: Starting job  from savepoint
> timestamp-2: Restoring job  from Savepoint
>
> Best wishes,
> Chen-Che Huang
>


ParquetColumnarRowInputFormat - parameter description

2022-01-10 Thread Krzysztof Chmielewski
Hi,
I would like to ask for some more details regarding
three ParquetColumnarRowInputFormat contruction parameters.

The parameters are:
batchSize,
isUtcTimestamp,
isCaseSensitive

The parametr names gives some hint about their purpose but there is no
description in docs (java, flink page).

Could you provide me some information about the batching process and other
two boolean flags?

Regards,
Krzysztof Chmielewski


Re: Uploading jar to s3 for persistence

2022-01-10 Thread Piotr Nowojski
Hi Puneet,

Have you seen this thread before? [1]. It looks like the same issue and
especially this part might be the key:

> Be aware that the filesystem used by the FileUploadHandler
> is java.nio.file.FileSystem and not
> Flink's org.apache.flink.core.fs.FileSystem for which we provide different
> FileSystem implementations.

Best,
Piotrek

[1] https://www.mail-archive.com/user@flink.apache.org/msg38043.html



pon., 10 sty 2022 o 08:19 Puneet Duggal 
napisał(a):

> Hi,
>
> Currently i am working with flink HA cluster with 3 job managers and 3
> zookeeper nodes. Also i am persisting my checkpoints to s3 and hence
> already configured required flink-s3 jars during flink job manager and task
> manager process startup. Now i have configured a variable
>
> web.upload.dir: s3p://d11-flink-job-manager-load/jars
>
> Expectation is that jar upload via rest apis will be uploaded to this
> location and hence is accessible to all 3 job managers (which eventually
> will help in job submission as all 3 job managers will have record of
> uploaded jar to this location). But while uploading the jar, I am facing
> following Illegal Argument Exception which i am not sure why. Also above
> provided s3 location was created before job manager process was even
> started.
>
> *2022-01-09 18:12:46,790 WARN
>  org.apache.flink.runtime.rest.FileUploadHandler  [] - File
> upload failed.*
> *java.lang.IllegalArgumentException: UploadDirectory is not absolute.*
> at
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> org.apache.flink.runtime.rest.handler.FileUploads.(FileUploads.java:59)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:186)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:69)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> at
> org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>
>
>
>


Re: Flink native k8s integration vs. operator

2022-01-10 Thread Gyula Fóra
Hi All!

This is a very interesting discussion.

I think many users find it confusing what deployment mode to choose when
considering a new production application on Kubernetes. With all the
options of native, standalone and different operators this can get tricky :)

I really like the idea that Thomas brought up to have at least a minimal
operator implementation in Flink itself to cover the most common production
job lifecycle management scenarios. I think the Flink community has a very
strong experience in this area to create a successful implementation that
would benefit most production users on Kubernetes.

Cheers,
Gyula

On Mon, Jan 10, 2022 at 4:29 AM Yang Wang  wrote:

> Thanks all for this fruitful discussion.
>
> I think Xintong has given a strong point why we introduced the native K8s
> integration, which is active resource management.
> I have a concrete example for this in the production. When a K8s node is
> down, the standalone K8s deployment will take longer
> recovery time based on the K8s eviction time(IIRC, default is 5 minutes).
> For the native K8s integration, Flink RM could be aware of the
> TM heartbeat lost and allocate a new one timely.
>
> Also when introducing the native K8s integration, another hit is that we
> should make the users are easy enough to migrate from YARN deployment.
> They already have a production-ready job life-cycle management system,
> which is using Flink CLI to submit the Flink jobs.
> So we provide a consistent command "bin/flink run-application -t
> kubernetes-application/yarn-application" to start a Flink application and
> "bin/flink cancel/stop ..."
> to terminate a Flink application.
>
>
> Compared with K8s operator, I know that this is not a K8s
> native mechanism. Hence, I also agree that we still need a powerful K8s
> operator which
> could work with both standalone and native K8s modes. The major difference
> between them is how to start the JM and TM pods. For standalone,
> they are managed by K8s job/deployment. For native, maybe we could simply
> create a submission carrying the "flink run-application" arguments
> which is derived from the Flink application CR.
>
> Make the Flink's active resource manager can talk to the K8s operator is
> an interesting option, which could support both standalone and native.
> Then Flink RM just needs to declare the resource requirement(e.g. 2 * <2G,
> 1CPU>, 2 * <4G, 1CPU>) and defer the resource allocation/de-allocation
> to the K8s operator. It feels like an intermediate form between native and
> standalone mode :)
>
>
>
> Best,
> Yang
>
>
>
> Xintong Song  于2022年1月7日周五 12:02写道:
>
>> Hi folks,
>>
>> Thanks for the discussion. I'd like to share my two cents on this topic.
>>
>> Firstly, I'd like to clarify my understanding of the concepts "native k8s
>> integration" and "active resource management".
>> - Native k8s integration means Flink's master interacts with k8s' api
>> server directly. It acts like embedding an operator inside Flink's master,
>> which manages the resources (pod, deployment, configmap, etc.) and watches
>> / reacts to related events.
>> - Active resource management means Flink can actively start / terminate
>> workers as needed. Its key characteristic is that the resource a Flink
>> deployment uses is decided by the job's execution plan, unlike the opposite
>> reactive mode (resource available to the deployment decides the execution
>> plan) or the standalone mode (both execution plan and deployment resources
>> are predefined).
>>
>> Currently, we have the yarn and native k8s deployments (and the recently
>> removed mesos deployment) in active mode, due to their ability to request /
>> release worker resources from the underlying cluster. And all the existing
>> operators, AFAIK, work with a Flink standalone deployment, where Flink
>> cannot request / release resources by itself.
>>
>> From this perspective, I think a large part of the native k8s integration
>> advantages come from the active mode: being able to better understand the
>> job's resource requirements and adjust the deployment resource accordingly.
>> Both fine-grained resource management (customizing TM resources for
>> different tasks / operators) and adaptive batch scheduler (rescale the
>> deployment w.r.t. different stages) fall into this category.
>>
>> I'm wondering if we can have an operator that also works with the active
>> mode. Instead of talking to the api server directly for adding / deleting
>> resources, Flink's active resource manager can talk to the operator (via
>> CR) about the resources the deployment needs, and let the operator to
>> actually add / remove the resources. The operator should be able to work
>> with (active) or without (standalone) the information of deployment's
>> resource requirements. In this way, users are free to choose between active
>> and reactive (e.g., HPA) rescaling, while always benefiting from the
>> beyond-deployment lifecycle (upgrades, savepoint management, etc.) and
>> alig

Re: Custom Kafka Keystore on Amazon Kinesis Analytics

2022-01-10 Thread Piotr Nowojski
Hi Clayton,

I think in principle this example should be still valid, however instead of
providing a `CustomFlinkKafkaConsumer` and overriding it's `open` method,
you would probably need to override
`org.apache.flink.connector.kafka.source.reader.KafkaSourceReader#start`.
So you would most likely need both at the very least a custom
`KafkaSourceReader` and `KafkaSource` to instantiate your custom
`KafkaSourceReader`. But I'm not sure if anyone has ever tried this so far.

Best,
Piotrek

pt., 7 sty 2022 o 21:18 Clayton Wohl  napisał(a):

> If I want to migrate from FlinkKafkaConsumer to KafkaSource, does the
> latter support this:
>
>
> https://docs.aws.amazon.com/kinesisanalytics/latest/java/example-keystore.html
>
> Basically, I'm running my Flink app in Amazon's Kinesis Analytics hosted
> Flink environment. I don't have reliable access to the local file system.
> At the documentation link above, Amazon recommends adding a hook to copy
> the keystore files from the classpath to a /tmp directory at runtime. Can
> KafkaSource do something similar?
>


Re: Plans to update StreamExecutionEnvironment.readFiles to use the FLIP-27 compatible FileSource?

2022-01-10 Thread Fabian Paul
Hi Kevin,

I created a ticket to track the effort [1]. Unfortunately, we are
already in the last few weeks of the release cycle for 1.15 so I
cannot guarantee that someone can implement it until then.

Best,
Fabian

[1] https://issues.apache.org/jira/browse/FLINK-25591

On Fri, Jan 7, 2022 at 5:07 PM Kevin Lam  wrote:
>
> Hi all,
>
> Are there any plans to update StreamExecutionEnvironment.readFiles to use the 
> new FLIP-27 compatible FileSource?
>
> readFiles supports some features via it's FileInputFormat like 
> setNestedFileEnumeration and setFilesFilter that we'd be interested in 
> continuing to use but it seems FileSource doesn't have that.


Re: RichMapFunction to convert tuple of strings to DataStream[(String,String)]

2022-01-10 Thread Piotr Nowojski
Hi Sid,

I don't see on the stackoverflow explanation of what are you trying to do
here (no mentions of MapFunction or a tuple).

If you want to create a `DataStream` from some a pre
existing/static Tuple of Strings, the easiest thing would be to convert the
tuple to a collection/iterator and use
`StreamExecutionEnvironment#fromCollection(...)`.
If you already have a `DataStream>` (for example your source
produces a tuple) and you want to flatten it to `DataStream`, then
you need a simple `FlatMapFunction, String>` (or
`RichFlatMapFunction, String>`), that would do the flattening
via:

public void flatMap(Tuple value, Collector out) throws
Exception {
  out.collect(value.f0);
  out.collect(value.f1);
  ...;
  out.collect(value.fN);
}

Best,
Piotrek

pt., 7 sty 2022 o 07:05 Siddhesh Kalgaonkar 
napisał(a):

> Hi Francis,
>
> What I am trying to do is you can see over here
> https://stackoverflow.com/questions/70592174/richsinkfunction-for-cassandra-in-flink/70593375?noredirect=1#comment124796734_70593375
>
>
> On Fri, Jan 7, 2022 at 5:07 AM Francis Conroy <
> francis.con...@switchdin.com> wrote:
>
>> Hi Siddhesh,
>>
>> How are you getting this tuple of strings into the system? I think this
>> is the important question, you can create a DataStream in many ways, from a
>> collection, from a source, etc but all of these rely on the
>> ExecutionEnvironment you're using.
>> A RichMapFunction doesn't produce a datastream directly, it's used in the
>> context of the StreamExecutionEnvironment to create a stream i.e.
>> DataStream.map([YourRichMapFunction]) this implies that you already need a
>> datastream to transform a datastream using a mapFunction
>> (MapFunction/RichMapFunction)
>> Francis
>>
>> On Fri, 7 Jan 2022 at 01:48, Siddhesh Kalgaonkar <
>> kalgaonkarsiddh...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> As I am new and I am facing one issue so I came across RichMapFunction.
>>> How can I use RichMapFunction to convert a tuple of strings to datastream?
>>> If not how can I do it apart from using StreamExecutionEnvironment?
>>>
>>> Thanks,
>>> Sid
>>>
>>
>> This email and any attachments are proprietary and confidential and are
>> intended solely for the use of the individual to whom it is addressed. Any
>> views or opinions expressed are solely those of the author and do not
>> necessarily reflect or represent those of SwitchDin Pty Ltd. If you have
>> received this email in error, please let us know immediately by reply email
>> and delete it from your system. You may not use, disseminate, distribute or
>> copy this message nor disclose its contents to anyone.
>> SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300
>> Australia
>>
>


Re: pyflink mixed with Java operators

2022-01-10 Thread Dian Fu
Hi,

You could try the following method:

```
from pyflink.java_gateway import get_gateway

jvm = get_gateway().jvm
ds = (
DataStream(ds._j_data_stream.map(jvm.com.example.MyJavaMapFunction()))
)
```

Regards,
Dian

On Fri, Jan 7, 2022 at 1:00 PM Francis Conroy 
wrote:

> Hi all,
>
> Does anyone know if it's possible to specify a java map function at some
> intermediate point in a pyflink job? In this case
>
> SimpleCountMeasurementsPerUUID
>
> is a flink java MapFunction. The reason we want to do this is that
> performance in pyflink seems quite poor.
> e.g.
>
> import logging
> import os
> import sys
> import zlib
>
> import Measurements_pb2
> from pyflink.common import Types
> from pyflink.common.serialization import 
> KafkaRecordSerializationSchemaBuilder, SimpleStringSchema
> from pyflink.datastream import StreamExecutionEnvironment, 
> RuntimeExecutionMode, MapFunction, RuntimeContext, \
> CheckpointingMode
> from pyflink.datastream.connectors import RMQConnectionConfig, RMQSource, 
> KafkaSink
>
> from functions.common import KeyByUUID
> from functions.file_lister import auto_load_python_files
> from customisations.serialisation import ZlibDeserializationSchema
>
>
> class ZlibDecompressor(MapFunction):
> def map(self, value):
> decomp = zlib.decompress(value[1])
> return value[0], decomp
>
>
> class MeasurementSnapshotCountMapFunction(MapFunction):
> def map(self, value):
> pb_body = Measurements_pb2.MeasurementSnapshot()
> pb_body.ParseFromString(value)
> meas_count = len(pb_body.measurements)
> if meas_count > 0:
> first_measurement = pb_body.measurements[0]
> point_uuid = first_measurement.point_uuid.value
> timestamp = first_measurement.time
>
> return timestamp, point_uuid, meas_count
>
> return None
>
>
> def word_count():
> env = StreamExecutionEnvironment.get_execution_environment()
> jarpath = 
> f"file://{os.getcwd()}/../src-java/switchdin-flink-serialization/target/serialization-1.0-SNAPSHOT.jar"
> env.add_jars(jarpath)
> auto_load_python_files(env)
> env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
> # write all the data to one file
> env.set_parallelism(1)
> env.enable_checkpointing(1000, CheckpointingMode.AT_LEAST_ONCE)
>
> connection_config = RMQConnectionConfig.Builder() \
> .set_host("rabbitmq") \
> .set_port(5672) \
> .set_virtual_host("/") \
> .set_user_name("guest") \
> .set_password("guest") \
> .set_connection_timeout(60) \
> .set_prefetch_count(5000) \
> .build()
>
> deserialization_schema = ZlibDeserializationSchema()
>
> stream = env.add_source(RMQSource(
> connection_config,
> "flink-test",
> False,
> deserialization_schema,
> )).set_parallelism(1)
>
> # compute word count
> dstream = 
> stream.map(MeasurementSnapshotCountMapFunction()).uid("DecompressRMQData") \
> .key_by(KeyByUUID(), key_type=Types.STRING()) \
> .jMap("org.switchdin.operators.SimpleCountMeasurementsPerUUID")  # 
> Hypothetical
>
> kafka_serialisation_schema = KafkaRecordSerializationSchemaBuilder() \
> .set_value_serialization_schema(SimpleStringSchema()) \
> .set_topic("flink-test-kafka") \
> .build()
>
> dstream.sink_to(
> KafkaSink.builder() \
> .set_record_serializer(kafka_serialisation_schema) \
> .set_bootstrap_servers("kafka:9092") \
> .build()
> )
>
> # submit for execution
> env.execute()
>
>
> if __name__ == '__main__':
> logging.basicConfig(stream=sys.stdout, level=logging.INFO, 
> format="%(message)s")
> word_count()
>
>
>
> This email and any attachments are proprietary and confidential and are
> intended solely for the use of the individual to whom it is addressed. Any
> views or opinions expressed are solely those of the author and do not
> necessarily reflect or represent those of SwitchDin Pty Ltd. If you have
> received this email in error, please let us know immediately by reply email
> and delete it from your system. You may not use, disseminate, distribute or
> copy this message nor disclose its contents to anyone.
> SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300
> Australia
>


Re: Job stuck in savePoint - entire topic replayed on restart.

2022-01-10 Thread Piotr Nowojski
Hi Basil,

1. What do you mean by:
> The only way we could stop these stuck jobs was to patch the finalizers.
?
2. Do you mean that your job is stuck when doing stop-with-savepoint?
3. What Flink version are you using? Have you tried upgrading to the most
recent version, or at least the most recent minor release? There have been
some bugs in the past with stop-with-savepoint, that have been fixed over
time. For example [1], [2] or [3]. Note that some of them might not be
related to your use case (Kinesis consumer or FLIP-27 sources).
4. If upgrading won't help, can you post stack traces of task managers that
contain the stuck operators/tasks?
5. If you are working on a version that has fixed all of those bugs, are
you using some custom operators/sources/sinks? If your code is either
capturing interrupts, or doing some blocking calls, it might be prone to
bugs similar to [2] (please check the discussion in the ticket for more
information).

Best,
Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-21028
[2] https://issues.apache.org/jira/browse/FLINK-17170
[3] https://issues.apache.org/jira/browse/FLINK-21133

czw., 6 sty 2022 o 16:23 Basil Bibi  napisał(a):

> Hi,
> We experienced a problem in production during a release.
> Our application is deployed to kubernetes using argocd and uses the Lyft
> flink operator.
> We tried to do a release and found that on deleting the application some
> of the jobs became stuck in "savepointing" phase.
> The only way we could stop these stuck jobs was to patch the finalizers.
> We deployed the new release and on startup our application had lost it's
> offsets so all of the messages in kafka were replayed.
> Has anyone got any ideas how and why this happened and how we avoid it in
> the future?
> Sincerely Basil Bibi
>
>
>
> Authorised and regulated by the Financial Conduct Authority
>  (FCA) number 923700. *Humn.ai Ltd*, 12
> Hammersmith Grove, London, W6 7AP is a registered company number 11032616
> incorporated in the United Kingdom. Registered with the information
> commissioner’s office (ICO) number ZA504331.
>
> This message contains confidential information and is intended only for
> the individual(s) addressed in the message. If you aren't the named
> addressee, you should not disseminate, distribute, or copy this e-mail.
>


Re: Regarding Connector Options - value.deserializer

2022-01-10 Thread Hang Ruan
Hi, Ronak,

We can not set specific 'value.deserializer' in table option.
'key.deserializer' and 'value.deserializer' is always set to
'org.apache.kafka.common.serialization.ByteArrayDeserializer'.

If you want to implement a format, you could take a look at the code
JsonFormatFactory.java in flink-formats/flink-json. And the format will be
loaded via SPI.

Best,
Hang

Ronak Beejawat (rbeejawa)  于2022年1月10日周一 17:51写道:

> Hi Hang,
>
>
>
> My question is can we use specific ‘value.deserializer’ in table option
> via kafka connector is there any way or not ? I have already kept
> 'value.format' in below code snippet so is that enough and handle
> deserializer by itself internally?
>
> How to create custom format can you please share any link for sample
> example for the same  ?
>
>
>
> Thanks
>
> Ronak Beejawat
>
>
>
>
>
>
>
> *From:* Hang Ruan 
> *Sent:* Monday, January 10, 2022 3:06 PM
> *To:* d...@flink.apache.org; Ronak Beejawat (rbeejawa) 
> *Cc:* commun...@flink.apache.org; user@flink.apache.org
> *Subject:* Re: Regarding Connector Options - value.deserializer
>
>
>
> Hi, Ronak,
>
>
>
> I think you should implement a custom format by yourself instead of
> overriding. The 'value.format' is a required table option.
>
>
>
> Best,
>
> Hang
>
>
>
> Ronak Beejawat (rbeejawa)  于2022年1月10日周一 17:09
> 写道:
>
> Hi Team,
>
> Is there any way we use value.deserializer in Connector Options from kafka
> via sql api?
>
> PFB below code snippt :
>
> tableEnv.executeSql("CREATE TABLE cmrTable (\r\n"
>  + "   org_id STRING\r\n"
>  + "   ,cluster_id STRING\r\n"
>  + "   ,globalcallid_callmanagerid STRING\r\n"
>  + "   ,globalcallid_callid INT\r\n"
>  + "   ,callidentifier INT\r\n"
>  + ",varvqmetrics STRING\r\n"
>  + ",duration INT\r\n"
>  + "   )\r\n"
>  + "   WITH (\r\n"
>  + "   'connector' = 'kafka'\r\n"
>  + "   ,'topic' = 'cmr'\r\n"
>  + "   ,'properties.bootstrap.servers' = '
> b-1.telemetry-msk-cluster.h1qn4w.c1.kafka.us-east-1.amazonaws.com:9092
> '\r\n"
>  + "   ,'scan.startup.mode' = 'earliest-offset'\r\n"
>  + "   ,'properties.value.deserializer' = 'json'\r\n"
>  + "   ,'value.format' = 'json'\r\n"
>  + "   )");
>
>
> Thanks
> Ronak Beejawat
>
>


Re: How to reduce interval between Uptime Metric meaasurements?

2022-01-10 Thread Chesnay Schepler
I'd suggest to double check whether you're actually scraping every 
second, or every minute.


To my knowledge the uptime metric is not periodically updated but always 
reflects the latest state when polled.


On 31/12/2021 16:37, Geldenhuys, Morgan Karl wrote:


Thanks for the hint, however i am not using the prometheus push gateway.


Regards,

M.


*From:* Caizhi Weng 
*Sent:* 28 December 2021 02:17:34
*To:* Geldenhuys, Morgan Karl
*Cc:* user@flink.apache.org
*Subject:* Re: How to reduce interval between Uptime Metric 
meaasurements?

Hi!

Have you tried metrics.reporter.promgateway.interval? See [1] for more 
detail.


[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/metric_reporters/#prometheuspushgateway


Geldenhuys, Morgan Karl  
于2021年12月28日周二 02:43写道:


Hello everyone,


I have a flink 1.14 job running and im looking at the uptime
metric (flink_jobmanager_job_uptime) together with prometheus
(scrape every second). It looks as if this metric is updated every
60 seconds, is there a way of decreasing this interval? A fixed
delay recovery strategy of 1s is being used. There doesnt seem to
be anything related to this in the configs

(https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/).



Regards,

M.



Re: Flink rest api to start a job

2022-01-10 Thread Chesnay Schepler
This is expected behavior. Since jar A is on the classpath you are able 
to access the entry-class of said jar. When you specify the jar id all 
that does is put another jar on the classpath; it is not enforce that 
the entry-class is loaded from said jar.


On 08/01/2022 16:45, Qihua Yang wrote:

Hi Yun,

Thank you for your reply! testB.jar doesn't have the same entry class 
as testA.jar.

So is it expected behavior? What is the theory behind?

Thanks,
Qihua

On Fri, Jan 7, 2022 at 4:27 PM Yun Gao  wrote:


Hi Qihua

Sorry may I double confirm that whether the entry class exists in
both testA and testB?

IF testA.jar is included on startup, it would be loaded in the
parent classloader, which
is the parent classloader for the user classloader that loads
testB. Thus at least if the
entry-class is exist only in testA, it should still be found.

Best,
Yun

--
Sender:Qihua Yang
Date:2022/01/07 02:55:09
Recipient:user
Theme:Flink rest api to start a job

Hi,

I found a weird behavior. We launched a k8s cluster without
job. But includes the jar A. I use Flink rest api to upload a
dummy jar(actually it can be any jar). Flink will create a jar
id. Then I use rest api to start the job with the jar A
entry-class. But the jar id is the dummy jar id. Flink will
start the job from jar A. Anyone know why?
My understanding is flink rest api should start the job from
the dummy jar, because jar id is dummy jar id that I uploaded.
Here are steps what I did:
1. deploy a k8s pod contains working jar(testA.jar)
1. flink rest api upload jar, testB.jar, flink generate jar
id, 2d6a9263-c9d3-4f23-9f59-fc3594aadf0c_job.jar
2. flink rest api to runJar with testB.jar id, but testA.jar
entry-class.
3. flink start job from testA.jar

Thanks,
Qihua



Re: Request: Java 17 Support?

2022-01-10 Thread Chesnay Schepler

See https://issues.apache.org/jira/browse/FLINK-15736

Java 17 support is currently *not* expected for 1.15.

On 09/01/2022 23:21, Clayton Wohl wrote:
Are there any plans for Flink to support Java 17 and provide Java 
17-based Docker images?


There are a variety of new language/VM features we'd like to use and 
we were hoping Flink would support Java 17.


thanks
Kurt




Compatible alternative for ParquetInputFormat in Flink > 1.14.0

2022-01-10 Thread Meghajit Mazumdar
In flink-parquet_2.12 version 1.13.0, there used to be a class called
as *org.apache.flink.formats.parquet.ParquetInputFormat
. *This class's constructor used to accept

a org.apache.parquet.schema.MessageType and a org.apache.flink.core.fs.Path.

However, in flink-parquet_2.12 version > 1.14.0
,
this class doesn't seem to be present any more. There is this class but
it's constructor args are quite different and it requires us to specify the
RowType.

I understand this was a major version upgrade. But just wanted to confirm
if there is any other class/method to achieve the same behavior as
*ParquetInputFormat *in 1.14.0+ ?


-- 
*Regards,*
*Meghajit*


Re: Regarding Connector Options - value.deserializer

2022-01-10 Thread Hang Ruan
Hi, Ronak,

I think you should implement a custom format by yourself instead of
overriding. The 'value.format' is a required table option.

Best,
Hang

Ronak Beejawat (rbeejawa)  于2022年1月10日周一
17:09写道:

> Hi Team,
>
> Is there any way we use value.deserializer in Connector Options from kafka
> via sql api?
>
> PFB below code snippt :
>
> tableEnv.executeSql("CREATE TABLE cmrTable (\r\n"
>  + "   org_id STRING\r\n"
>  + "   ,cluster_id STRING\r\n"
>  + "   ,globalcallid_callmanagerid STRING\r\n"
>  + "   ,globalcallid_callid INT\r\n"
>  + "   ,callidentifier INT\r\n"
>  + ",varvqmetrics STRING\r\n"
>  + ",duration INT\r\n"
>  + "   )\r\n"
>  + "   WITH (\r\n"
>  + "   'connector' = 'kafka'\r\n"
>  + "   ,'topic' = 'cmr'\r\n"
>  + "   ,'properties.bootstrap.servers' = '
> b-1.telemetry-msk-cluster.h1qn4w.c1.kafka.us-east-1.amazonaws.com:9092
> '\r\n"
>  + "   ,'scan.startup.mode' = 'earliest-offset'\r\n"
>  + "   ,'properties.value.deserializer' = 'json'\r\n"
>  + "   ,'value.format' = 'json'\r\n"
>  + "   )");
>
>
> Thanks
> Ronak Beejawat
>


Re: [ANNOUNCE] Apache Flink ML 2.0.0 released

2022-01-10 Thread Till Rohrmann
This is really great news. Thanks a lot for all the work Dong, Yun, Zhipeng
and others!

Cheers,
Till

On Fri, Jan 7, 2022 at 2:36 PM David Morávek  wrote:

> Great job! <3 Thanks Dong and Yun for managing the release and big thanks
> to everyone who has contributed!
>
> Best,
> D.
>
> On Fri, Jan 7, 2022 at 2:27 PM Yun Gao  wrote:
>
>> The Apache Flink community is very happy to announce the release of
>> Apache Flink ML 2.0.0.
>>
>>
>>
>> Apache Flink ML provides API and infrastructure that simplifies
>> implementing distributed ML algorithms,
>>
>> and it also provides a library of off-the-shelf ML algorithms.
>>
>>
>>
>> Please check out the release blog post for an overview of the release:
>>
>> https://flink.apache.org/news/2022/01/07/release-ml-2.0.0.html
>>
>>
>>
>> The release is available for download at:
>>
>> https://flink.apache.org/downloads.html
>>
>>
>>
>> Maven artifacts for Flink ML can be found at:
>>
>> https://search.maven.org/search?q=g:org.apache.flink%20ml
>>
>>
>>
>> Python SDK for Flink ML published to the PyPI index can be found at:
>>
>> https://pypi.org/project/apache-flink-ml/
>>
>>
>>
>> The full release notes are available in Jira:
>>
>>
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12351079
>>
>>
>>
>> We would like to thank all contributors of the Apache Flink community who
>> made this release possible!
>>
>>
>>
>> Regards,
>>
>> Dong and Yun
>>
>


Re: unaligned checkpoint for job with large start delay

2022-01-10 Thread Piotr Nowojski
Hi Mason,

Sorry for a late reply, but I was OoO.

I think you could confirm it with more custom metrics. Counting how many
windows have been registered/fired and plotting that over time.

I think it would be more helpful in this case to check how long a task has
been blocked being "busy" processing for example timers. FLINK-25414 shows
only blocked on being hard/soft backpressure. Unfortunately at the moment I
don't know how to implement such a metric without affecting performance on
the critical path, so I don't see this happening soon :(

Best,
Piotrek

wt., 4 sty 2022 o 18:02 Mason Chen  napisał(a):

> Hi Piotrek,
>
> In other words, something (presumably a watermark) has fired more than 151
> 200 windows at once, which is taking ~1h 10minutes to process and during
> this time the checkpoint can not make any progress. Is this number of
> triggered windows plausible in your scenario?
>
>
> It seems plausible—there are potentially many keys (and many windows). Is
> there a way to confirm with metrics? We can add a window fire counter to
> the window operator that only gets incremented at the end of windows
> evaluation, in order to see the huge jumps in window fires. I can this
> benefiting other users who troubleshoot the problem of large number of
> window firing.
>
> Best,
> Mason
>
> On Dec 29, 2021, at 2:56 AM, Piotr Nowojski  wrote:
>
> Hi Mason,
>
> > and it has to finish processing this output before checkpoint can
> begin—is this right?
>
> Yes. Checkpoint will be only executed once all triggered windows will be
> fully processed.
>
> But from what you have posted it looks like all of that delay is
> coming from hundreds of thousands of windows firing all at the same time.
> Between 20:30 and ~21:40 there must have been a bit more than 36 triggers/s
> * 60s/min * 70min = 151 200triggers fired at once (or in a very short
> interval). In other words, something (presumably a watermark) has fired
> more than 151 200 windows at once, which is taking ~1h 10minutes to process
> and during this time the checkpoint can not make any progress. Is this
> number of triggered windows plausible in your scenario?
>
> Best,
> Piotrek
>
>
> czw., 23 gru 2021 o 12:12 Mason Chen  napisał(a):
>
>> Hi Piotr,
>>
>> Thanks for the thorough response and the PR—will review later.
>>
>> Clarifications:
>> 1. The flat map you refer to produces at most 1 record.
>> 2. The session window operator’s *window process function* emits at
>> least 1 record.
>> 3. The 25 ms sleep is at the beginning of the window process function.
>>
>> Your explanation about how records being bigger than the buffer size can
>> cause blockage makes sense to me. However, my average record size is around 
>> 770
>> bytes coming out of the source and 960 bytes coming out of the window.
>> Also, we don’t override the default `taskmanager.memory.segment-size`. My
>> Flink job memory config is as follows:
>>
>> ```
>> taskmanager.memory.jvm-metaspace.size: 512 mb
>> taskmanager.memory.jvm-overhead.max: 2Gb
>> taskmanager.memory.jvm-overhead.min: 512Mb
>> taskmanager.memory.managed.fraction: '0.4'
>> taskmanager.memory.network.fraction: '0.2'
>> taskmanager.memory.network.max: 2Gb
>> taskmanager.memory.network.min: 200Mb
>> taskmanager.memory.process.size: 16Gb
>> taskmanager.numberOfTaskSlots: '4'
>> ```
>>
>>  Are you sure your job is making any progress? Are records being
>> processed? Hasn't your job simply deadlocked on something?
>>
>>
>> To distinguish task blockage vs graceful backpressure, I have checked the
>> operator throughput metrics and have confirmed that during window *task*
>> buffer blockage, the window *operator* DOES emit records. Tasks look
>> like they aren’t doing anything but the window is emitting records.
>>
>> 
>>
>>
>> Furthermore, I created a custom trigger to wrap a metric counter for
>> FIRED counts to get a estimation of how many windows are fired at the same
>> time. I ran a separate job with the same configs—the results look as
>> follows:
>> 
>>
>> On average, when the buffers are blocked, there are 36 FIREs per second.
>> Since each of these fires invokes the window process function, 25 ms * 36 =
>> 900 ms means we sleep almost a second cumulatively, per second—which is
>> pretty severe. Combined with the fact that the window process function can
>> emit many records, the task takes even longer to checkpoint since the
>> flatmap/kafka sink is chained with the window operator—and it has to finish
>> processing this output before checkpoint can begin—*is this right?* In
>> addition, when the window fires per second reduces, checkpoint is able to
>> continue and succeed.
>>
>> So, I think that the surge of window firing combined with the sleep is
>> the source of the issue, which makes sense. I’m not sure how to confirm
>> whether or not the points about buffer sizes being insufficient for the
>> window output is also interplaying with this issue.
>>
>> Best,
>> Mason
>>
>>
>> On Dec 22, 2021, at 6:17 AM, Piotr Nowojski  

Is there a way to know how long a Flink app takes to finish resuming from Savepoint?

2022-01-10 Thread Chen-Che Huang
Hi all,

I'm trying to speed up the process of resuming from a savepoint by
adjusting some configuration.
I wonder whether there exists a way to know how much time our Flink app
spends resuming from a savepoint?
>From the logs, I can see only the starting time of the resuming (as shown
below) but couldn't find the end time of the resuming.
If there exists some metrics or information about the resuming time, it'd
be very helpful for the tuning.
Any comment is appreciated.

timestamp-1: Starting job  from savepoint
timestamp-2: Restoring job  from Savepoint

Best wishes,
Chen-Che Huang


Re: Is State TTL possible with event-time characteristics ?

2022-01-10 Thread jinzhong li
Hi Dan,

Currently, event-time TTL hasn't been supported due to implementation 
complexity and ambiguous semantics which need more discussion[1][2].

So people usually use state TTL with processing-time characteristics[3]. Could 
the processing-time ttl state meet your requirements in your case?

If yes, you can refer the flink documentation[3][4] about how to use state TTL.

[1] https://issues.apache.org/jira/browse/FLINK-12005
[2] https://lists.apache.org/thread/8nhs2dfthh6whstn1j200tpq7pbjx1lh
[3] 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/state/#state-time-to-live-ttl
[4] https://flink.apache.org/2019/05/19/state-ttl.html

Best regards,
Jinzhong, Li


On 2022/01/10 04:39:43 Dan Hill wrote:
> Hi.  Any updates on this?
> 
> How do people usually do TTLs?  I want to add a backup TTL in case I have
> leaky state.
> 
> On Wed, Jun 17, 2020 at 6:08 AM Andrey Zagrebin 
> wrote:
> 
> > Hi Arti,
> >
> > Any program can use State with TTL but the state can only expire in
> > processing time at the moment even if you configure event-time
> > characteristics.
> > As Congxian mentioned, the event time for TTL is planned.
> >
> > The cleanup says that it will not be removed 'by default'. The following
> > sections [1] describe background cleanup which is not activated 'by
> > default' in 1.9 but in 1.10.
> > If you activate the background cleanup, you do not have to read the
> > expired state to clean it up as if you have those timers you mentioned.
> > See also the docs for details about background cleanup caveats.
> >
> > The timers approach is a valid way but heavy-weight in terms of storage
> > because Flink will have to create a separate state for timers:
> > key/timestamp.
> > The timers approach is not implemented in Flink out-of-the-box at the
> > moment. It can be implemented in the application as a simple background
> > cleanup.
> >
> > Best,
> > Andrey
> >
> > [1]
> > https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html#cleanup-in-background
> >
> > On Wed, Jun 17, 2020 at 3:07 PM Congxian Qiu 
> > wrote:
> >
> >> Hi
> >>  Currently, Flink does not support event-time TTL state, there is an
> >> issue[1] tracking this.
> >> [1] https://issues.apache.org/jira/browse/FLINK-12005
> >> Best,
> >> Congxian
> >>
> >>
> >> Arti Pande  于2020年6月17日周三 下午7:37写道:
> >>
> >>> With Flink 1.9 is state TTL supported for event-time characteristics? This
> >>> part
> >>> 
> >>> of the documentation says that
> >>>
> >>>
> >>>-
> >>>
> >>>Only TTLs in reference to *processing time* are currently supported.
> >>>
> >>> Does this mean if a program uses event-time characteristics with
> >>> stateful operators, it can not use TTL ??
> >>>
> >>> Also clean up section
> >>> 
> >>>  of
> >>> the documentation says state values that are never read will never be
> >>> cleared.
> >>> [image: Screenshot 2020-06-17 at 5.00.41 PM.png]
> >>> The question is, when processing a stream with unique elements or
> >>> keys why would Flink framework expect the same key to be read in order for
> >>> it to be removed after its expiration time ? Why does it not simply clean
> >>> up the value for that key based on timers automatically without waiting 
> >>> for
> >>> read operation from user code?
> >>>
> >>> Thanks
> >>> Arti
> >>>
> >>
>