write into parquet with variable number of columns

2021-08-05 Thread Sharipov, Rinat
Hi mates !

I'm trying to find the best way to persist data into columnar format
(parquet) using Flink.
Each event contains a fixed list of properties and a variable list of
properties, defined by the user.
And I would  like to save user defined properties into separate columns on
the fly.

Here is an example of my events:

[
  {
"eventId": "7e2b33c8-9c00-42ed-b780-dbbd8b0f6562",
"timestamp": 123,
"attributes": {
  "gender": "male",
  "geo": "Germany"
}
  },
  {
"eventId": "7e2b33c8-9c00-42ed-b780-dbbd8b0f6562",
"timestamp": 123,
"attributes": {
  "car": "big-cool-car",
  "phone": "iphone"
}
  }
]

As a result, I would like to have a table with the following columns

*eventId | timestamp | gender | geo | car | phone*

I've looked into streaming file sink, but it requires defining a schema
before starting a job, which is not possible in my case.
Also I've remembered about *explode sql function* that can help me with a
standard sql, but it doesn't exist in the Flink Table API.

I have found that since *1.13.0 version Flink *supports creation of row by
names using *Row.withNames(), *so I guess this can be
a key that solves my problem, here is what java doc says

*Name-based field mode **withNames() creates a variable-length row. The
fields can be accessed by name using getField(String) and setField(String,
Object). Every field is initialized during the first call to
setField(String, Object) for the given name. However, the framework will
initialize missing fields with null and reorder all fields once more type
information is available during serialization or input conversion. Thus,
even name-based rows eventually become fixed-length composite types with a
deterministic field order. Name-based rows perform worse than
position-based rows but simplify row creation and code readability.*

So it seems that all I need is to transform my event into a record manually
and persist the resulting table into a file-system, but my noop demo
example fails within an exception, here it is:

public class TestApp {

  public static void main(String[] args) {

StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment(1);

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

Row row1 = Row.withNames();

row1.setField("a", "fb1");

row1.setField("b", "gmail1");
row1.setField("c", "vk1");

Row row2 = Row.withNames();
row2.setField("b", "ok2");
row2.setField("c", "gmail2");

tableEnv.fromValues(row1, row2).printSchema();

  }

}

Here is a stack trace of the exception:

*java.lang.IllegalArgumentException: Accessing a field by position is
not supported in name-based field mode.*

*   at org.apache.flink.types.Row.getField(Row.java:257)*

*   at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250)*

*   at 
java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)*

*   at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693)*

*   at 
java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)*

*   at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)*

*   at 
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)*

*   at 
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)*

*   at 
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)*

*   at 
org.apache.flink.table.expressions.ApiExpressionUtils.convertRow(ApiExpressionUtils.java:123)*

*   at 
org.apache.flink.table.expressions.ApiExpressionUtils.objectToExpression(ApiExpressionUtils.java:103)*

*   at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)*

*   at 
java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)*

*   at 
java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)*

*   at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)*

*   at 
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)*

*   at 
java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)*

*   at 
java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)*

*   at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.fromValues(TableEnvironmentImpl.java:359)*

*   at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.fromValues(TableEnvironmentImpl.java:334)*


Maybe someone has tried this feature and can guess what's wrong with
the current code and how to make it work.

Anyway I have a fallback - accumulate a butch of events, define the
schema for them and write into file system manually, but I still hope
that I can do this in more elegant way.

Thx for your advice and time !


-- 
Best Regards,
*Sharipov Rinat*

CleverDATA
make your data clever


Re: [Flink::Test] access registered accumulators via harness

2020-10-30 Thread Sharipov, Rinat
Hi Dawid, thx for your reply and sorry for a question with a double
interpretation !

You understood me correctly, I would like to get counters value by their
names after completing all operations with the harness component.

I suppose that it should be useful because most of our functions are
implementations of Flink functions and create counters in open phase.

I'm expecting that the following API can be useful in
AbstractStreamOperatorTestHarness
<https://github.com/apache/flink/blob/2dc872a8fc54116add41f0a1933dbd4a436819f0/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractBroadcastStreamOperatorTestHarness.java#L29>
:

public static class AbstractStreamOperatorTestHarness  implements
AutoCloseable
{

  // that will give us an access to whole registered counters, metric
sub-groups, gauges, etc
  public MetricGroup getMetricGroup() {
return environment.getMetricGroup();
  }
}

Here is an example of usage, based on your example

private static class MyProcessFunction extends ProcessFunction {

private Counter myCustomCounter1;
private Counter myCustomCounter2;

@Override
public void open(Configuration parameters) throws Exception {
this.myCustomCounter1 =
getRuntimeContext().getMetricGroup().counter("myCustomCounter1");
this.myCustomCounter2 =
getRuntimeContext().getMetricGroup().counter("myCustomCounter2");
}

@Override
public void processElement(IN value, Context ctx, Collector
out) throws Exception {
if (checkCase1(value)) {
   myCustomCounter1.inc();
   return;
}
if (checkCase2(value)) {
   myCustomCounter2.inc();
   return;
}

out.collect(logic.doMyBusinessLogic(value));
}
}

public static class TestMyProcessFunction {
   public void processElement_should_incCounter1() {
  ProcessFunctionTestHarness harness = ...;
  harness.processElement(element);

assertThat(harness.getMetricGroup().counter("myCustomCounter1").getCount(),
equalTo(1));

assertThat(harness.getMetricGroup().counter("myCustomCounter2").getCount(),
equalTo(0));
   }
}



What do you think about such a harness API proposal ?

Thx !

пт, 30 окт. 2020 г. в 12:54, Dawid Wysakowicz :

> Hi Rinat,
>
> First of all, sorry for some nitpicking in the beginning, but your message
> might be a bit misleading for some. If I understood your message correctly
> you are referring to Metrics, not accumulators, which are a different
> concept[1]. Or were you indeed referring to accumulators?
>
> Now on to the topic of accessing metrics. Personally I don't think it is a
> right way for testing by exposing metrics somehow in the
> ProcessFunctionTestHarness. The harness should primarily be used as a
> minimal execution environment for testing operators and such behaviours as
> e.g. checkpointing. I would not recommend using it for testing business
> logic and most definitely metrics. I'd either test that in an IT test using
> a MiniCluster and a metric reporter you can assert or I'd separate the
> business logic from the setup logic. Something like:
> private static class MyProcessFunction extends
> ProcessFunction {
>
> private MyLogic logic;
>
> @Override
> public void open(Configuration parameters) throws Exception {
> super.open(parameters);
> this.logic = new
> MyLogic<>(getRuntimeContext().getMetricGroup().counter("my-counter"));
> }
>
> @Override
> public void processElement(IN value, Context ctx, Collector
> out) throws Exception {
> out.collect(logic.doMyBusinessLogic(value));
> }
> }
>
> private static class MyLogic {
>
> private final Counter counter;
>
> public MyLogic(Counter counter) {
> this.counter = counter;
> }
>
> public OUT doMyBusinessLogic(IN value) {
> // do the processing
> }
>
> }
>
> That way you can easily test your MyLogic class including interactions
> with the counter, by passing a mock counter.
>
> Best,
>
> Dawid
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/user_defined_functions.html#accumulators--counters
> On 27/10/2020 08:02, Sharipov, Rinat wrote:
>
> Hi mates !
>
> I guess that I'm doing something wrong, but I couldn't find a way to
> access registered accumulators and their values via
> *org.apache.flink.streaming.util.**ProcessFunctionTestHarness *function
> wrapper that I'm using to test my functions.
>
> During the code research I've found, that required data is stored in
> *org.apac

[Flink::Test] access registered accumulators via harness

2020-10-27 Thread Sharipov, Rinat
Hi mates !

I guess that I'm doing something wrong, but I couldn't find a way to access
registered accumulators and their values via
*org.apache.flink.streaming.util.**ProcessFunctionTestHarness *function
wrapper that I'm using to test my functions.

During the code research I've found, that required data is stored in
*org.apache.flink.runtime.metrics.groups.AbstractMetricGroup#*metrics field,
that is private and is not accessible from tests. It's obvious that Flink
somehow accesses this field and exposes counters into it's Web UI.

So I guess that someone can help me to add a check into my Unit Tests for
metrics counting or in case if there is no such ability I'm ready to help
to implement it if the community considers this acceptable.

Thx !


Re: PyFlink :: Bootstrap UDF function

2020-10-14 Thread Sharipov, Rinat
Hi Dian !

Thx a lot for your reply, it's very helpful for us.



чт, 15 окт. 2020 г. в 04:30, Dian Fu :

> Hi Rinat,
>
> It's called in single thread fashion and so there is no need for the
> synchronization.
>
> Besides, there is a pair of open/close methods in the ScalarFunction and
> you could also override them and perform the initialization work in the
> open method.
>
> Regards,
> Dian
>
> 在 2020年10月15日,上午3:19,Sharipov, Rinat  写道:
>
> Hi mates !
>
> I keep moving in my research of new features of PyFlink and I'm really
> excited about that functionality.
> My main goal is to understand how to integrate our ML registry, powered by
> ML Flow and PyFlink jobs and what restrictions we have.
>
> I need to bootstrap the UDF function on it's startup when it's
> instantiated in the Apache Beam process, but I don't know how it's called
> by PyFlink in single thread fashion or shared among multiple threads. In
> other words, I want to know, should I care about synchronization of my
> bootstrap logic or not.
>
> Here is a code example of my UDF function:
>
>
>
>
>
>
>
>
>
>
>
>
>
> *class MyFunction(ScalarFunction):def __init__(self):
> self.initialized = Falsedef __bootstrap(self):return 
> "bootstrapped"def eval(self, urls):if self.initialized:   
>  self.__bootstrap()return "my-result"my_function = udf(MyFunction(), 
> [DataTypes.ARRAY(DataTypes.STRING())], DataTypes.STRING())*
>
>
> Thx a lot for your help !
>
>
>


PyFlink :: Bootstrap UDF function

2020-10-14 Thread Sharipov, Rinat
Hi mates !

I keep moving in my research of new features of PyFlink and I'm really
excited about that functionality.
My main goal is to understand how to integrate our ML registry, powered by
ML Flow and PyFlink jobs and what restrictions we have.

I need to bootstrap the UDF function on it's startup when it's instantiated
in the Apache Beam process, but I don't know how it's called by PyFlink in
single thread fashion or shared among multiple threads. In other words, I
want to know, should I care about synchronization of my bootstrap logic or
not.

Here is a code example of my UDF function:













*class MyFunction(ScalarFunction):def __init__(self):
self.initialized = Falsedef __bootstrap(self):return
"bootstrapped"def eval(self, urls):if self.initialized:
self.__bootstrap()return "my-result"my_function =
udf(MyFunction(), [DataTypes.ARRAY(DataTypes.STRING())],
DataTypes.STRING())*


Thx a lot for your help !


Re: PyFlink 1.11.2 couldn’t configure [taskmanager.memory.task.off-heap.size] property when registering custom UDF function

2020-10-12 Thread Sharipov, Rinat
Hi Xingbo, thx a lot, it works !

But I'm still sure that it's not obvious from a user point of view,
that *pyflink-shell.sh
*doesn't use provided flink-conf.yaml, don't you think that it looks like
an issue ?

Thx !

вт, 13 окт. 2020 г. в 05:35, Xingbo Huang :

> Hi,
>
> You can use api to set configuration:
> table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
> '80m')
>
> The flink-conf.yaml way will only take effect when submitted through flink
> run, and the minicluster way(python xxx.py) will not take effect.
>
> Best,
> Xingbo
>
> Sharipov, Rinat  于2020年10月13日周二 上午1:56写道:
>
>> Hi mates !
>>
>> I'm very new at pyflink and trying to register a custom UDF function
>> using python API.
>> Currently I faced an issue in both server env and my local IDE
>> environment.
>>
>> When I'm trying to execute the example below I got an error message: *The
>> configured Task Off-Heap Memory 0 bytes is less than the least required
>> Python worker Memory 79 mb. The Task Off-Heap Memory can be configured
>> using the configuration key 'taskmanager.memory.task.off-heap.size*
>>
>> Of course I've added required property into *flink-conf.yaml *and
>> checked that *pyflink-shell.sh *initializes env using specified
>> configuration but it doesn't make any sense and I still have an error.
>>
>> I've also attached my flink-conf.yaml file
>>
>> Thx for your help !
>>
>> *Here is an example:*
>>
>> from pyflink.dataset import ExecutionEnvironment
>> from pyflink.table import BatchTableEnvironment, DataTypes
>> from pyflink.table.udf import udf
>>
>>
>> @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
>> def test_udf(i):
>> return i
>>
>>
>> if __name__ == "__main__":
>> env = ExecutionEnvironment.get_execution_environment()
>> env.set_parallelism(1)
>>
>> bt_env = BatchTableEnvironment.create(env)
>> bt_env.register_function("test_udf", test_udf)
>>
>> my_table = bt_env.from_elements(
>> [
>> ("user-1", "http://url/1";),
>> ("user-2", "http://url/2";),
>> ("user-1", "http://url/3";),
>> ("user-3", "http://url/4";),
>> ("user-1", "http://url/3";)
>> ],
>> [
>> "uid", "url"
>> ]
>> )
>>
>> my_table_grouped_by_uid = my_table.group_by("uid").select("uid, 
>> collect(url) as urls")
>> bt_env.create_temporary_view("my_temp_table", my_table_grouped_by_uid)
>>
>> bt_env.execute_sql("select test_udf(uid) as uid, urls from 
>> my_temp_table").print()
>>
>>
>>
>>


PyFlink 1.11.2 couldn’t configure [taskmanager.memory.task.off-heap.size] property when registering custom UDF function

2020-10-12 Thread Sharipov, Rinat
Hi mates !

I'm very new at pyflink and trying to register a custom UDF function using
python API.
Currently I faced an issue in both server env and my local IDE environment.

When I'm trying to execute the example below I got an error message: *The
configured Task Off-Heap Memory 0 bytes is less than the least required
Python worker Memory 79 mb. The Task Off-Heap Memory can be configured
using the configuration key 'taskmanager.memory.task.off-heap.size*

Of course I've added required property into *flink-conf.yaml *and checked
that *pyflink-shell.sh *initializes env using specified configuration but
it doesn't make any sense and I still have an error.

I've also attached my flink-conf.yaml file

Thx for your help !

*Here is an example:*

from pyflink.dataset import ExecutionEnvironment
from pyflink.table import BatchTableEnvironment, DataTypes
from pyflink.table.udf import udf


@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
def test_udf(i):
return i


if __name__ == "__main__":
env = ExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)

bt_env = BatchTableEnvironment.create(env)
bt_env.register_function("test_udf", test_udf)

my_table = bt_env.from_elements(
[
("user-1", "http://url/1";),
("user-2", "http://url/2";),
("user-1", "http://url/3";),
("user-3", "http://url/4";),
("user-1", "http://url/3";)
],
[
"uid", "url"
]
)

my_table_grouped_by_uid = my_table.group_by("uid").select("uid,
collect(url) as urls")
bt_env.create_temporary_view("my_temp_table", my_table_grouped_by_uid)

bt_env.execute_sql("select test_udf(uid) as uid, urls from
my_temp_table").print()


Re: [PyFlink] update udf functions on the fly

2020-10-12 Thread Sharipov, Rinat
Hi Dian, thx for your reply !

I was wondering to replace UDF on the fly from Flink, of course I'm pretty
sure that it's possible to implement update logic directly in Python, thx
for idea

Regards,
Rinat

пн, 12 окт. 2020 г. в 14:20, Dian Fu :

> Hi Rinat,
>
> Do you want to replace the UDFs with new ones on the fly or just want to
> update the model which could be seen as instance variables inside the UDF?
>
> For the former case, it's not supported AFAIK.
> For the latter case, I think you could just update the model in the UDF
> periodically or according to some custom strategy. It's the behavior of the
> UDF.
>
> Regards,
> Dian
>
> 在 2020年10月12日,下午5:51,Sharipov, Rinat  写道:
>
> Hi Arvid, thx for your reply.
>
> We are already using the approach with control streams to propagate
> business rules through our data-pipeline.
>
> Because all our models are powered by Python, I'm going to use Table API
> and register UDF functions, where each UDF is a separate model.
>
> So my question is - can I update the UDF function on the fly without a job
> restart ?
> Because new model versions become available on a daily basis and we should
> use them as soon as possible.
>
> Thx !
>
>
>
>
> пн, 12 окт. 2020 г. в 11:32, Arvid Heise :
>
>> Hi Rinat,
>>
>> Which API are you using? If you use datastream API, the common way to
>> simulate side inputs (which is what you need) is to use a broadcast. There
>> is an example on SO [1].
>>
>> [1]
>> https://stackoverflow.com/questions/54667508/how-to-unit-test-broadcastprocessfunction-in-flink-when-processelement-depends-o
>>
>> On Sat, Oct 10, 2020 at 7:12 PM Sharipov, Rinat 
>> wrote:
>>
>>> Hi mates !
>>>
>>> I'm in the beginning of the road of building a recommendation pipeline
>>> on top of Flink.
>>> I'm going to register a list of UDF python functions on job
>>> startups where each UDF is an ML model.
>>>
>>> Over time new model versions appear in the ML registry and I would like
>>> to update my UDF functions on the fly without need to restart the whole job.
>>> Could you tell me, whether it's possible or not ? Maybe the community
>>> can give advice on how such tasks can be solved using Flink and what other
>>> approaches exist.
>>>
>>> Thanks a lot for your help and advice !
>>>
>>>
>>>
>>
>> --
>> Arvid Heise | Senior Java Developer
>> <https://www.ververica.com/>
>>
>> Follow us @VervericaData
>> --
>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> Conference
>> Stream Processing | Event Driven | Real Time
>> --
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Toni) Cheng
>>
>
>


Re: [PyFlink] update udf functions on the fly

2020-10-12 Thread Sharipov, Rinat
Hi Arvid, thx for your reply.

We are already using the approach with control streams to propagate
business rules through our data-pipeline.

Because all our models are powered by Python, I'm going to use Table API
and register UDF functions, where each UDF is a separate model.

So my question is - can I update the UDF function on the fly without a job
restart ?
Because new model versions become available on a daily basis and we should
use them as soon as possible.

Thx !




пн, 12 окт. 2020 г. в 11:32, Arvid Heise :

> Hi Rinat,
>
> Which API are you using? If you use datastream API, the common way to
> simulate side inputs (which is what you need) is to use a broadcast. There
> is an example on SO [1].
>
> [1]
> https://stackoverflow.com/questions/54667508/how-to-unit-test-broadcastprocessfunction-in-flink-when-processelement-depends-o
>
> On Sat, Oct 10, 2020 at 7:12 PM Sharipov, Rinat 
> wrote:
>
>> Hi mates !
>>
>> I'm in the beginning of the road of building a recommendation pipeline on
>> top of Flink.
>> I'm going to register a list of UDF python functions on job
>> startups where each UDF is an ML model.
>>
>> Over time new model versions appear in the ML registry and I would like
>> to update my UDF functions on the fly without need to restart the whole job.
>> Could you tell me, whether it's possible or not ? Maybe the community can
>> give advice on how such tasks can be solved using Flink and what other
>> approaches exist.
>>
>> Thanks a lot for your help and advice !
>>
>>
>>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


Re: [PyFlink] register udf functions with different versions of the same library in the same job

2020-10-11 Thread Sharipov, Rinat
Hi Xingbo ! Thx a lot for such a detailed reply, it is very useful.

пн, 12 окт. 2020 г. в 09:32, Xingbo Huang :

> Hi,
> I will do my best to provide pyflink related content, I hope it helps you.
>
> >>>  each udf function is a separate process, that is managed by Beam (but
> I'm not sure I got it right).
>
> Strictly speaking, it is not true that every UDF is in a different python
> process. For example, the two python functions of udf1 and udf2 such as
> udf1(udf2(a)) are running in a python process, and you can even think that
> there is a return value of python wrap func udf1(udf2(a)). In fact, you can
> think that in most of the cases, we will put multiple python udf together
> to improve its performance.
>
> >>> Does it mean that I can register multiple udf functions with different
> versions of the same library or what would be even better with different
> python environments and they won't clash
>
> A PyFlink job All nodes use the same python environment path currently. So
> there is no way to make each UDF use a different python execution
> environment. Maybe you need to use multiple jobs to achieve this effect.
>
> Best,
> Xingbo
>
> Sharipov, Rinat  于2020年10月10日周六 上午1:18写道:
>
>> Hi mates !
>>
>> I've just read an amazing article
>> <https://medium.com/@Alibaba_Cloud/the-flink-ecosystem-a-quick-start-to-pyflink-6ad09560bf50>
>> about PyFlink and I'm absolutely delighted.
>> I got some questions about udf registration, and it seems that it's
>> possible to specify the list of libraries that should be used to evaluate
>> udf functions.
>>
>> As far as I understand, each udf function is a separate process, that is
>> managed by Beam (but I'm not sure I got it right).
>> Does it mean that I can register multiple udf functions with different
>> versions of the same library or what would be even better with different
>> python environments and they won't clash ?
>>
>> A few words about the task that I'm trying to solve: I would like to
>> build a recommendation pipeline that will accumulate features as a table
>> and make
>> recommendations using models from Ml flow registry. Since I don't want to
>> limit data analysts from usage in all libraries that they won't, the best
>> solution
>> for me - assemble the environment using conda descriptor and register a
>> UDF function.
>>
>> Kubernetes and Kubeflow are not an option for us yet, so we are trying to
>> include models into existing pipelines.
>>
>> thx !
>>
>>
>>
>>
>>


[PyFlink] update udf functions on the fly

2020-10-10 Thread Sharipov, Rinat
Hi mates !

I'm in the beginning of the road of building a recommendation pipeline on
top of Flink.
I'm going to register a list of UDF python functions on job
startups where each UDF is an ML model.

Over time new model versions appear in the ML registry and I would like to
update my UDF functions on the fly without need to restart the whole job.
Could you tell me, whether it's possible or not ? Maybe the community can
give advice on how such tasks can be solved using Flink and what other
approaches exist.

Thanks a lot for your help and advice !


[PyFlink] register udf functions with different versions of the same library in the same job

2020-10-09 Thread Sharipov, Rinat
Hi mates !

I've just read an amazing article

about PyFlink and I'm absolutely delighted.
I got some questions about udf registration, and it seems that it's
possible to specify the list of libraries that should be used to evaluate
udf functions.

As far as I understand, each udf function is a separate process, that is
managed by Beam (but I'm not sure I got it right).
Does it mean that I can register multiple udf functions with different
versions of the same library or what would be even better with different
python environments and they won't clash ?

A few words about the task that I'm trying to solve: I would like to build
a recommendation pipeline that will accumulate features as a table and make
recommendations using models from Ml flow registry. Since I don't want to
limit data analysts from usage in all libraries that they won't, the best
solution
for me - assemble the environment using conda descriptor and register a UDF
function.

Kubernetes and Kubeflow are not an option for us yet, so we are trying to
include models into existing pipelines.

thx !


“feedback loop” and checkpoints in itearative streams

2020-04-04 Thread Sharipov, Rinat
Hi mates, for some reason, it's necessary to create a feedback look in my
streaming application.

The best choice to implement it was iterative stream, but at the moment of
job implementation (flink version is 1.6.1) it wasn't checkpointed.
So I decided to write this output into kafka.

As I see, there is no changes at this moment (from official docs)

> Please note that records in flight in the loop edges (and the state
changes associated with them) will be lost during failure.

Does the community have any plans to fix this behaviour ?

thx !