write into parquet with variable number of columns

2021-08-05 Thread Sharipov, Rinat
Hi mates !

I'm trying to find the best way to persist data into columnar format
(parquet) using Flink.
Each event contains a fixed list of properties and a variable list of
properties, defined by the user.
And I would  like to save user defined properties into separate columns on
the fly.

Here is an example of my events:

[
  {
"eventId": "7e2b33c8-9c00-42ed-b780-dbbd8b0f6562",
"timestamp": 123,
"attributes": {
  "gender": "male",
  "geo": "Germany"
}
  },
  {
"eventId": "7e2b33c8-9c00-42ed-b780-dbbd8b0f6562",
"timestamp": 123,
"attributes": {
  "car": "big-cool-car",
  "phone": "iphone"
}
  }
]

As a result, I would like to have a table with the following columns

*eventId | timestamp | gender | geo | car | phone*

I've looked into streaming file sink, but it requires defining a schema
before starting a job, which is not possible in my case.
Also I've remembered about *explode sql function* that can help me with a
standard sql, but it doesn't exist in the Flink Table API.

I have found that since *1.13.0 version Flink *supports creation of row by
names using *Row.withNames(), *so I guess this can be
a key that solves my problem, here is what java doc says

*Name-based field mode **withNames() creates a variable-length row. The
fields can be accessed by name using getField(String) and setField(String,
Object). Every field is initialized during the first call to
setField(String, Object) for the given name. However, the framework will
initialize missing fields with null and reorder all fields once more type
information is available during serialization or input conversion. Thus,
even name-based rows eventually become fixed-length composite types with a
deterministic field order. Name-based rows perform worse than
position-based rows but simplify row creation and code readability.*

So it seems that all I need is to transform my event into a record manually
and persist the resulting table into a file-system, but my noop demo
example fails within an exception, here it is:

public class TestApp {

  public static void main(String[] args) {

StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment(1);

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

Row row1 = Row.withNames();

row1.setField("a", "fb1");

row1.setField("b", "gmail1");
row1.setField("c", "vk1");

Row row2 = Row.withNames();
row2.setField("b", "ok2");
row2.setField("c", "gmail2");

tableEnv.fromValues(row1, row2).printSchema();

  }

}

Here is a stack trace of the exception:

*java.lang.IllegalArgumentException: Accessing a field by position is
not supported in name-based field mode.*

*   at org.apache.flink.types.Row.getField(Row.java:257)*

*   at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250)*

*   at 
java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)*

*   at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693)*

*   at 
java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)*

*   at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)*

*   at 
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)*

*   at 
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)*

*   at 
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)*

*   at 
org.apache.flink.table.expressions.ApiExpressionUtils.convertRow(ApiExpressionUtils.java:123)*

*   at 
org.apache.flink.table.expressions.ApiExpressionUtils.objectToExpression(ApiExpressionUtils.java:103)*

*   at 
java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)*

*   at 
java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)*

*   at 
java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)*

*   at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)*

*   at 
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)*

*   at 
java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)*

*   at 
java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)*

*   at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.fromValues(TableEnvironmentImpl.java:359)*

*   at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.fromValues(TableEnvironmentImpl.java:334)*


Maybe someone has tried this feature and can guess what's wrong with
the current code and how to make it work.

Anyway I have a fallback - accumulate a butch of events, define the
schema for them and write into file system manually, but I still hope
that I can do this in more elegant way.

Thx for your advice and time !


-- 
Best Regards,
*Sharipov Rinat*

CleverDATA
make your data clever


Re: [Flink::Test] access registered accumulators via harness

2020-10-30 Thread Sharipov, Rinat
Hi Dawid, thx for your reply and sorry for a question with a double
interpretation !

You understood me correctly, I would like to get counters value by their
names after completing all operations with the harness component.

I suppose that it should be useful because most of our functions are
implementations of Flink functions and create counters in open phase.

I'm expecting that the following API can be useful in
AbstractStreamOperatorTestHarness
<https://github.com/apache/flink/blob/2dc872a8fc54116add41f0a1933dbd4a436819f0/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractBroadcastStreamOperatorTestHarness.java#L29>
:

public static class AbstractStreamOperatorTestHarness  implements
AutoCloseable
{

  // that will give us an access to whole registered counters, metric
sub-groups, gauges, etc
  public MetricGroup getMetricGroup() {
return environment.getMetricGroup();
  }
}

Here is an example of usage, based on your example

private static class MyProcessFunction extends ProcessFunction {

private Counter myCustomCounter1;
private Counter myCustomCounter2;

@Override
public void open(Configuration parameters) throws Exception {
this.myCustomCounter1 =
getRuntimeContext().getMetricGroup().counter("myCustomCounter1");
this.myCustomCounter2 =
getRuntimeContext().getMetricGroup().counter("myCustomCounter2");
}

@Override
public void processElement(IN value, Context ctx, Collector
out) throws Exception {
if (checkCase1(value)) {
   myCustomCounter1.inc();
   return;
}
if (checkCase2(value)) {
   myCustomCounter2.inc();
   return;
}

out.collect(logic.doMyBusinessLogic(value));
}
}

public static class TestMyProcessFunction {
   public void processElement_should_incCounter1() {
  ProcessFunctionTestHarness harness = ...;
  harness.processElement(element);

assertThat(harness.getMetricGroup().counter("myCustomCounter1").getCount(),
equalTo(1));

assertThat(harness.getMetricGroup().counter("myCustomCounter2").getCount(),
equalTo(0));
   }
}



What do you think about such a harness API proposal ?

Thx !

пт, 30 окт. 2020 г. в 12:54, Dawid Wysakowicz :

> Hi Rinat,
>
> First of all, sorry for some nitpicking in the beginning, but your message
> might be a bit misleading for some. If I understood your message correctly
> you are referring to Metrics, not accumulators, which are a different
> concept[1]. Or were you indeed referring to accumulators?
>
> Now on to the topic of accessing metrics. Personally I don't think it is a
> right way for testing by exposing metrics somehow in the
> ProcessFunctionTestHarness. The harness should primarily be used as a
> minimal execution environment for testing operators and such behaviours as
> e.g. checkpointing. I would not recommend using it for testing business
> logic and most definitely metrics. I'd either test that in an IT test using
> a MiniCluster and a metric reporter you can assert or I'd separate the
> business logic from the setup logic. Something like:
> private static class MyProcessFunction extends
> ProcessFunction {
>
> private MyLogic logic;
>
> @Override
> public void open(Configuration parameters) throws Exception {
> super.open(parameters);
> this.logic = new
> MyLogic<>(getRuntimeContext().getMetricGroup().counter("my-counter"));
> }
>
> @Override
> public void processElement(IN value, Context ctx, Collector
> out) throws Exception {
> out.collect(logic.doMyBusinessLogic(value));
> }
> }
>
> private static class MyLogic {
>
> private final Counter counter;
>
> public MyLogic(Counter counter) {
> this.counter = counter;
> }
>
> public OUT doMyBusinessLogic(IN value) {
> // do the processing
> }
>
> }
>
> That way you can easily test your MyLogic class including interactions
> with the counter, by passing a mock counter.
>
> Best,
>
> Dawid
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/user_defined_functions.html#accumulators--counters
> On 27/10/2020 08:02, Sharipov, Rinat wrote:
>
> Hi mates !
>
> I guess that I'm doing something wrong, but I couldn't find a way to
> access registered accumulators and their values via
> *org.apache.flink.streaming.util.**ProcessFunctionTestHarness *function
> wrapper that I'm using to test my functions.
>
> During the code research I've found, that required data is stored in
> *org.apache.flink.runtime.metrics.groups.Abstract

[Flink::Test] access registered accumulators via harness

2020-10-27 Thread Sharipov, Rinat
Hi mates !

I guess that I'm doing something wrong, but I couldn't find a way to access
registered accumulators and their values via
*org.apache.flink.streaming.util.**ProcessFunctionTestHarness *function
wrapper that I'm using to test my functions.

During the code research I've found, that required data is stored in
*org.apache.flink.runtime.metrics.groups.AbstractMetricGroup#*metrics field,
that is private and is not accessible from tests. It's obvious that Flink
somehow accesses this field and exposes counters into it's Web UI.

So I guess that someone can help me to add a check into my Unit Tests for
metrics counting or in case if there is no such ability I'm ready to help
to implement it if the community considers this acceptable.

Thx !


Re: PyFlink :: Bootstrap UDF function

2020-10-15 Thread Sharipov, Rinat
Hi Dian !

Thx a lot for your reply, it's very helpful for us.



чт, 15 окт. 2020 г. в 04:30, Dian Fu :

> Hi Rinat,
>
> It's called in single thread fashion and so there is no need for the
> synchronization.
>
> Besides, there is a pair of open/close methods in the ScalarFunction and
> you could also override them and perform the initialization work in the
> open method.
>
> Regards,
> Dian
>
> 在 2020年10月15日,上午3:19,Sharipov, Rinat  写道:
>
> Hi mates !
>
> I keep moving in my research of new features of PyFlink and I'm really
> excited about that functionality.
> My main goal is to understand how to integrate our ML registry, powered by
> ML Flow and PyFlink jobs and what restrictions we have.
>
> I need to bootstrap the UDF function on it's startup when it's
> instantiated in the Apache Beam process, but I don't know how it's called
> by PyFlink in single thread fashion or shared among multiple threads. In
> other words, I want to know, should I care about synchronization of my
> bootstrap logic or not.
>
> Here is a code example of my UDF function:
>
>
>
>
>
>
>
>
>
>
>
>
>
> *class MyFunction(ScalarFunction):def __init__(self):
> self.initialized = Falsedef __bootstrap(self):return 
> "bootstrapped"def eval(self, urls):if self.initialized:   
>  self.__bootstrap()return "my-result"my_function = udf(MyFunction(), 
> [DataTypes.ARRAY(DataTypes.STRING())], DataTypes.STRING())*
>
>
> Thx a lot for your help !
>
>
>


PyFlink :: Bootstrap UDF function

2020-10-14 Thread Sharipov, Rinat
Hi mates !

I keep moving in my research of new features of PyFlink and I'm really
excited about that functionality.
My main goal is to understand how to integrate our ML registry, powered by
ML Flow and PyFlink jobs and what restrictions we have.

I need to bootstrap the UDF function on it's startup when it's instantiated
in the Apache Beam process, but I don't know how it's called by PyFlink in
single thread fashion or shared among multiple threads. In other words, I
want to know, should I care about synchronization of my bootstrap logic or
not.

Here is a code example of my UDF function:













*class MyFunction(ScalarFunction):def __init__(self):
self.initialized = Falsedef __bootstrap(self):return
"bootstrapped"def eval(self, urls):if self.initialized:
self.__bootstrap()return "my-result"my_function =
udf(MyFunction(), [DataTypes.ARRAY(DataTypes.STRING())],
DataTypes.STRING())*


Thx a lot for your help !


Re: PyFlink 1.11.2 couldn’t configure [taskmanager.memory.task.off-heap.size] property when registering custom UDF function

2020-10-13 Thread Sharipov, Rinat
Hi Xingbo, thx a lot, it works !

But I'm still sure that it's not obvious from a user point of view,
that *pyflink-shell.sh
*doesn't use provided flink-conf.yaml, don't you think that it looks like
an issue ?

Thx !

вт, 13 окт. 2020 г. в 05:35, Xingbo Huang :

> Hi,
>
> You can use api to set configuration:
> table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
> '80m')
>
> The flink-conf.yaml way will only take effect when submitted through flink
> run, and the minicluster way(python xxx.py) will not take effect.
>
> Best,
> Xingbo
>
> Sharipov, Rinat  于2020年10月13日周二 上午1:56写道:
>
>> Hi mates !
>>
>> I'm very new at pyflink and trying to register a custom UDF function
>> using python API.
>> Currently I faced an issue in both server env and my local IDE
>> environment.
>>
>> When I'm trying to execute the example below I got an error message: *The
>> configured Task Off-Heap Memory 0 bytes is less than the least required
>> Python worker Memory 79 mb. The Task Off-Heap Memory can be configured
>> using the configuration key 'taskmanager.memory.task.off-heap.size*
>>
>> Of course I've added required property into *flink-conf.yaml *and
>> checked that *pyflink-shell.sh *initializes env using specified
>> configuration but it doesn't make any sense and I still have an error.
>>
>> I've also attached my flink-conf.yaml file
>>
>> Thx for your help !
>>
>> *Here is an example:*
>>
>> from pyflink.dataset import ExecutionEnvironment
>> from pyflink.table import BatchTableEnvironment, DataTypes
>> from pyflink.table.udf import udf
>>
>>
>> @udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
>> def test_udf(i):
>> return i
>>
>>
>> if __name__ == "__main__":
>> env = ExecutionEnvironment.get_execution_environment()
>> env.set_parallelism(1)
>>
>> bt_env = BatchTableEnvironment.create(env)
>> bt_env.register_function("test_udf", test_udf)
>>
>> my_table = bt_env.from_elements(
>> [
>> ("user-1", "http://url/1;),
>> ("user-2", "http://url/2;),
>> ("user-1", "http://url/3;),
>> ("user-3", "http://url/4;),
>> ("user-1", "http://url/3;)
>> ],
>> [
>> "uid", "url"
>> ]
>> )
>>
>> my_table_grouped_by_uid = my_table.group_by("uid").select("uid, 
>> collect(url) as urls")
>> bt_env.create_temporary_view("my_temp_table", my_table_grouped_by_uid)
>>
>> bt_env.execute_sql("select test_udf(uid) as uid, urls from 
>> my_temp_table").print()
>>
>>
>>
>>


PyFlink 1.11.2 couldn’t configure [taskmanager.memory.task.off-heap.size] property when registering custom UDF function

2020-10-12 Thread Sharipov, Rinat
Hi mates !

I'm very new at pyflink and trying to register a custom UDF function using
python API.
Currently I faced an issue in both server env and my local IDE environment.

When I'm trying to execute the example below I got an error message: *The
configured Task Off-Heap Memory 0 bytes is less than the least required
Python worker Memory 79 mb. The Task Off-Heap Memory can be configured
using the configuration key 'taskmanager.memory.task.off-heap.size*

Of course I've added required property into *flink-conf.yaml *and checked
that *pyflink-shell.sh *initializes env using specified configuration but
it doesn't make any sense and I still have an error.

I've also attached my flink-conf.yaml file

Thx for your help !

*Here is an example:*

from pyflink.dataset import ExecutionEnvironment
from pyflink.table import BatchTableEnvironment, DataTypes
from pyflink.table.udf import udf


@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
def test_udf(i):
return i


if __name__ == "__main__":
env = ExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)

bt_env = BatchTableEnvironment.create(env)
bt_env.register_function("test_udf", test_udf)

my_table = bt_env.from_elements(
[
("user-1", "http://url/1;),
("user-2", "http://url/2;),
("user-1", "http://url/3;),
("user-3", "http://url/4;),
("user-1", "http://url/3;)
],
[
"uid", "url"
]
)

my_table_grouped_by_uid = my_table.group_by("uid").select("uid,
collect(url) as urls")
bt_env.create_temporary_view("my_temp_table", my_table_grouped_by_uid)

bt_env.execute_sql("select test_udf(uid) as uid, urls from
my_temp_table").print()


Re: [PyFlink] update udf functions on the fly

2020-10-12 Thread Sharipov, Rinat
Hi Dian, thx for your reply !

I was wondering to replace UDF on the fly from Flink, of course I'm pretty
sure that it's possible to implement update logic directly in Python, thx
for idea

Regards,
Rinat

пн, 12 окт. 2020 г. в 14:20, Dian Fu :

> Hi Rinat,
>
> Do you want to replace the UDFs with new ones on the fly or just want to
> update the model which could be seen as instance variables inside the UDF?
>
> For the former case, it's not supported AFAIK.
> For the latter case, I think you could just update the model in the UDF
> periodically or according to some custom strategy. It's the behavior of the
> UDF.
>
> Regards,
> Dian
>
> 在 2020年10月12日,下午5:51,Sharipov, Rinat  写道:
>
> Hi Arvid, thx for your reply.
>
> We are already using the approach with control streams to propagate
> business rules through our data-pipeline.
>
> Because all our models are powered by Python, I'm going to use Table API
> and register UDF functions, where each UDF is a separate model.
>
> So my question is - can I update the UDF function on the fly without a job
> restart ?
> Because new model versions become available on a daily basis and we should
> use them as soon as possible.
>
> Thx !
>
>
>
>
> пн, 12 окт. 2020 г. в 11:32, Arvid Heise :
>
>> Hi Rinat,
>>
>> Which API are you using? If you use datastream API, the common way to
>> simulate side inputs (which is what you need) is to use a broadcast. There
>> is an example on SO [1].
>>
>> [1]
>> https://stackoverflow.com/questions/54667508/how-to-unit-test-broadcastprocessfunction-in-flink-when-processelement-depends-o
>>
>> On Sat, Oct 10, 2020 at 7:12 PM Sharipov, Rinat 
>> wrote:
>>
>>> Hi mates !
>>>
>>> I'm in the beginning of the road of building a recommendation pipeline
>>> on top of Flink.
>>> I'm going to register a list of UDF python functions on job
>>> startups where each UDF is an ML model.
>>>
>>> Over time new model versions appear in the ML registry and I would like
>>> to update my UDF functions on the fly without need to restart the whole job.
>>> Could you tell me, whether it's possible or not ? Maybe the community
>>> can give advice on how such tasks can be solved using Flink and what other
>>> approaches exist.
>>>
>>> Thanks a lot for your help and advice !
>>>
>>>
>>>
>>
>> --
>> Arvid Heise | Senior Java Developer
>> <https://www.ververica.com/>
>>
>> Follow us @VervericaData
>> --
>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> Conference
>> Stream Processing | Event Driven | Real Time
>> --
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Toni) Cheng
>>
>
>


Re: [PyFlink] update udf functions on the fly

2020-10-12 Thread Sharipov, Rinat
Hi Arvid, thx for your reply.

We are already using the approach with control streams to propagate
business rules through our data-pipeline.

Because all our models are powered by Python, I'm going to use Table API
and register UDF functions, where each UDF is a separate model.

So my question is - can I update the UDF function on the fly without a job
restart ?
Because new model versions become available on a daily basis and we should
use them as soon as possible.

Thx !




пн, 12 окт. 2020 г. в 11:32, Arvid Heise :

> Hi Rinat,
>
> Which API are you using? If you use datastream API, the common way to
> simulate side inputs (which is what you need) is to use a broadcast. There
> is an example on SO [1].
>
> [1]
> https://stackoverflow.com/questions/54667508/how-to-unit-test-broadcastprocessfunction-in-flink-when-processelement-depends-o
>
> On Sat, Oct 10, 2020 at 7:12 PM Sharipov, Rinat 
> wrote:
>
>> Hi mates !
>>
>> I'm in the beginning of the road of building a recommendation pipeline on
>> top of Flink.
>> I'm going to register a list of UDF python functions on job
>> startups where each UDF is an ML model.
>>
>> Over time new model versions appear in the ML registry and I would like
>> to update my UDF functions on the fly without need to restart the whole job.
>> Could you tell me, whether it's possible or not ? Maybe the community can
>> give advice on how such tasks can be solved using Flink and what other
>> approaches exist.
>>
>> Thanks a lot for your help and advice !
>>
>>
>>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


Re: [PyFlink] register udf functions with different versions of the same library in the same job

2020-10-12 Thread Sharipov, Rinat
Hi Xingbo ! Thx a lot for such a detailed reply, it is very useful.

пн, 12 окт. 2020 г. в 09:32, Xingbo Huang :

> Hi,
> I will do my best to provide pyflink related content, I hope it helps you.
>
> >>>  each udf function is a separate process, that is managed by Beam (but
> I'm not sure I got it right).
>
> Strictly speaking, it is not true that every UDF is in a different python
> process. For example, the two python functions of udf1 and udf2 such as
> udf1(udf2(a)) are running in a python process, and you can even think that
> there is a return value of python wrap func udf1(udf2(a)). In fact, you can
> think that in most of the cases, we will put multiple python udf together
> to improve its performance.
>
> >>> Does it mean that I can register multiple udf functions with different
> versions of the same library or what would be even better with different
> python environments and they won't clash
>
> A PyFlink job All nodes use the same python environment path currently. So
> there is no way to make each UDF use a different python execution
> environment. Maybe you need to use multiple jobs to achieve this effect.
>
> Best,
> Xingbo
>
> Sharipov, Rinat  于2020年10月10日周六 上午1:18写道:
>
>> Hi mates !
>>
>> I've just read an amazing article
>> <https://medium.com/@Alibaba_Cloud/the-flink-ecosystem-a-quick-start-to-pyflink-6ad09560bf50>
>> about PyFlink and I'm absolutely delighted.
>> I got some questions about udf registration, and it seems that it's
>> possible to specify the list of libraries that should be used to evaluate
>> udf functions.
>>
>> As far as I understand, each udf function is a separate process, that is
>> managed by Beam (but I'm not sure I got it right).
>> Does it mean that I can register multiple udf functions with different
>> versions of the same library or what would be even better with different
>> python environments and they won't clash ?
>>
>> A few words about the task that I'm trying to solve: I would like to
>> build a recommendation pipeline that will accumulate features as a table
>> and make
>> recommendations using models from Ml flow registry. Since I don't want to
>> limit data analysts from usage in all libraries that they won't, the best
>> solution
>> for me - assemble the environment using conda descriptor and register a
>> UDF function.
>>
>> Kubernetes and Kubeflow are not an option for us yet, so we are trying to
>> include models into existing pipelines.
>>
>> thx !
>>
>>
>>
>>
>>


[PyFlink] update udf functions on the fly

2020-10-10 Thread Sharipov, Rinat
Hi mates !

I'm in the beginning of the road of building a recommendation pipeline on
top of Flink.
I'm going to register a list of UDF python functions on job
startups where each UDF is an ML model.

Over time new model versions appear in the ML registry and I would like to
update my UDF functions on the fly without need to restart the whole job.
Could you tell me, whether it's possible or not ? Maybe the community can
give advice on how such tasks can be solved using Flink and what other
approaches exist.

Thanks a lot for your help and advice !


[PyFlink] register udf functions with different versions of the same library in the same job

2020-10-09 Thread Sharipov, Rinat
Hi mates !

I've just read an amazing article

about PyFlink and I'm absolutely delighted.
I got some questions about udf registration, and it seems that it's
possible to specify the list of libraries that should be used to evaluate
udf functions.

As far as I understand, each udf function is a separate process, that is
managed by Beam (but I'm not sure I got it right).
Does it mean that I can register multiple udf functions with different
versions of the same library or what would be even better with different
python environments and they won't clash ?

A few words about the task that I'm trying to solve: I would like to build
a recommendation pipeline that will accumulate features as a table and make
recommendations using models from Ml flow registry. Since I don't want to
limit data analysts from usage in all libraries that they won't, the best
solution
for me - assemble the environment using conda descriptor and register a UDF
function.

Kubernetes and Kubeflow are not an option for us yet, so we are trying to
include models into existing pipelines.

thx !


“feedback loop” and checkpoints in itearative streams

2020-04-04 Thread Sharipov, Rinat
Hi mates, for some reason, it's necessary to create a feedback look in my
streaming application.

The best choice to implement it was iterative stream, but at the moment of
job implementation (flink version is 1.6.1) it wasn't checkpointed.
So I decided to write this output into kafka.

As I see, there is no changes at this moment (from official docs)

> Please note that records in flight in the loop edges (and the state
changes associated with them) will be lost during failure.

Does the community have any plans to fix this behaviour ?

thx !


StreamingFileSink with hdfs less than 2.7

2019-06-17 Thread Rinat
Hi mates, I decided to enable persist the state of our flink jobs, that write 
data into hdfs, but got some troubles with that.

I’m trying to use StreamingFileSink with cloudera hadoop, which version is 
2.6.5,  and it doesn’t contain truncate method.

So, job fails immediately when it’s trying to start, when trying to initialize 
HadoopRecoverableWriter. Because it only works with hadoop fs, greater or 
equals than 2.7

Do you have any plans to adopt recovery for hadoop file systems, that doesn’t 
contain truncate method, or how I can workaround such limitation ?

If workaround does not exist, than the following behaviour will be good enough:

get a path to the file, that should be restored
get a valid-length from the state
create a temporary directory and write stream from the restoring file into tmp 
until the valid-length is not reached
replace the restoring file with the file from tmp catalog
move file to the final state

what do you think about it ?

Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

email: r.shari...@cleverdata.ru <mailto:a.totma...@cleverdata.ru>
mobile: +7 (925) 416-37-26

CleverDATA
make your data clever



Re: [Flink 1.6.1] _metadata file in retained checkpoint

2019-06-14 Thread Rinat
Hi Vasyl, thx for your reply, I’ll check

> On 10 Jun 2019, at 14:22, Vasyl Bervetskyi  wrote:
> 
> Hi Rinat,
>  
> Savepoint need to be triggered when you want to create point in time which 
> you want to use in future to revert back your state, also you could cancel 
> job with savepoint which makes sure that you won’t lose any data during 
> canceling job.
>  
> About your question that flink delete metadata file: I am using Flink 1.8.0 
> and Flink deletes metadata file when I cancel job with savepoint, It seems 
> Flink doesn’t see any reason to keep checkpoint cause you created savepoint. 
> When you just cancel job without savepoint flink shouldn’t delete metadata 
> file from checkpoint.
> From: Rinat  
> Sent: Wednesday, June 5, 2019 4:36 PM
> To: user 
> Subject: [Flink 1.6.1] _metadata file in retained checkpoint
>  
> Hi mates, I’m trying to configure my job to retain checkpoints on it’s 
> cancellation and got some troubles.
>  
> I got the following args why not to use savepoints:
>  
>  1. we already have all the job state on DFS in checkpoints directory
>  2. I can multiply size of the state on 2, when stopping the job, because the 
> state is already stored in checkpoints dir, and I’ll save it one more time 
> into savepoint dir
>  3. creation of checkpoints is incremental and savepoints - is not, so in my 
> case (10 Tb state) the process of savepoint creation will took too long time
>  4. As I know, we can rescale or job from retained checkpoints
>  
> I've configured my job to retain checkpoints on job cancelation and found an 
> interesting issue - _metadata file is removed, when job is cancelled.
>  
> So, I couldn’t restore my job from the retained checkpoint, is it an expected 
> behaviour ? If so, what is wrong ?
>  
>  
> Sincerely yours,
> Rinat Sharipov
> Software Engineer at 1DMP CORE Team
>  
> email: r.shari...@cleverdata.ru <mailto:a.totma...@cleverdata.ru>
> mobile: +7 (925) 416-37-26
>  
> CleverDATA
> make your data clever

Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

email: r.shari...@cleverdata.ru <mailto:a.totma...@cleverdata.ru>
mobile: +7 (925) 416-37-26

CleverDATA
make your data clever



[Flink 1.6.1] _metadata file in retained checkpoint

2019-06-05 Thread Rinat
Hi mates, I’m trying to configure my job to retain checkpoints on it’s 
cancellation and got some troubles.

I got the following args why not to use savepoints:

 1. we already have all the job state on DFS in checkpoints directory
 2. I can multiply size of the state on 2, when stopping the job, because the 
state is already stored in checkpoints dir, and I’ll save it one more time into 
savepoint dir
 3. creation of checkpoints is incremental and savepoints - is not, so in my 
case (10 Tb state) the process of savepoint creation will took too long time
 4. As I know, we can rescale or job from retained checkpoints

I've configured my job to retain checkpoints on job cancelation and found an 
interesting issue - _metadata file is removed, when job is cancelled.

So, I couldn’t restore my job from the retained checkpoint, is it an expected 
behaviour ? If so, what is wrong ?


Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

email: r.shari...@cleverdata.ru <mailto:a.totma...@cleverdata.ru>
mobile: +7 (925) 416-37-26

CleverDATA
make your data clever



expose number of entries in map state as a metric

2019-03-07 Thread Rinat
Hi mates, I would like to expose the number of keys in MapState as a job metric

At first, I decided the Gauge metric is suitable for this purpose but then I 
think a little and decided that if I will iterate over the whole state on each 
request to Gauge, it will be too heavy.
So, I decided to create a counter, that will be incremented each time, when the 
item is added into state, and decrement it, when item is removed by key.

After this, I've added timers into my job and registering a timer on each key. 
However, I couldn’t pass any context into timer call, to remove only specified 
key, when timer is triggered and I’m skanning all
the state and removing all entries, whose expirationDate is less or equal to 
triggered time.

In the same method I’m decrementing the my counter.

I’ve found, when multiple timers are fired very often, my counter becomes less 
than 0. It means that I’m removing the same keys multiple times and decremting 
counter for a key more than once.

So I got the following questions:
what is the best approach to expose the number of the keys in map state as a 
metric
can I pass some context via timers to remove only specific keys after the 
specified amount of time

Thx a lot

Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

email: r.shari...@cleverdata.ru <mailto:a.totma...@cleverdata.ru>
mobile: +7 (925) 416-37-26

CleverDATA
make your data clever



event time & watermarks in connected streams with broadcast state

2019-02-27 Thread Rinat
Hi mates, got some questions about using event time for the flink pipeline.

My pipeline consists of two connected streams, one is a stream with business 
rules and another one is a stream with user events. 

I’ve broadcasted stream with business rules and connected it to the stream of 
events, thus I can apply all existing rules to each event.
For those purposes I’ve implemented a KeyedBroadcastProcessFunction, that 
accumulates broadcast state and applies rules from it to each event.
In this function I would like to register event time timers.

I’ve specified a AssignerWithPeriodicWatermarks for the stream of events, that 
extracts event timestamp and uses it as a timestamp and watermark, but sill got 
no success, because the broadcasted stream doesn’t have such assigner and 
always returns Long.MIN as a watermark value, so flink uses the smallest 
watermark, received from both streams, so event time doesn’t updated.

How can I solve this problem and use timestamps from event stream as a pipeline 
event time ?
Here is the configuration of my pipeline.

val bSegments = env
  .addSource(rules)
  .broadcast(CustomerJourneyProcessor.RULES_STATE_DESCRIPTOR)

val keyedEvents = env
  .addSource(events)
  .assignTimestampsAndWatermarks(eventTimeAssigner)
  .keyBy { event => event.getId.getGid }

keyedEvents
  .connect(bSegments)
  .process(customerJourneyProcessor)
  .addSink(sink)

I’ve found a workaround, that works for me, but I’m not sure, that it’s a 
proper decision.

I can add a timestamp/ watermarks assigner to the stream of rules, that will 
always return System.currentTime(), thereby it always will be bigger than event 
timestamp, so, the KeyedBroadcastProcessFunction
will use events stream timestamp as a watermark.

class RuleTimestampAssigner extends 
AssignerWithPeriodicWatermarks[SegmentEvent] {

  override def getCurrentWatermark: Watermark = new 
Watermark(System.currentTimeMillis())

  override def extractTimestamp(rule: Rule, previousElementTimestamp: Long): 
Long = rule.created
}

But it looks like a hack and maybe someone can give an advice with the more 
convenient approach.

Thx !

Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

email: r.shari...@cleverdata.ru <mailto:a.totma...@cleverdata.ru>
mobile: +7 (925) 416-37-26

CleverDATA
make your data clever



[flink :: connected-streams :: integration-tests]

2019-02-21 Thread Rinat
Hi mates, I got some troubles with the implementation of integration tests for 
the job, based on connected streams.

It has the following logic:
I got two streams, first one is a stream of rules, and another one is a stream 
of events
to apply events on each rule, I’ve implemented a KeyedBroadcastProcessFunction 
, that broadcasts the set of rules, received from the stream
in the processBroadcastElement I'm updating the broadcast state
in the processElement method I’m evaluating all rules, from the broadcast 
state, using input event
 
I would like to implement an integration test, that will send a rule into 
pipeline and then, when it’ll be added to the broadcast state, send an event 
and check, that the output item is a result of rule evaluation.

For the test needs, I’ve replaced source functions with FromElementsFunction, 
that gives me a bounded streams with pre-defined items that will be passed over 
pipeline. 

But I couldn’t understand, how I can pass rules before sending events, maybe 
you know some practises or workarounds, how to achieve such behaviour, or maybe 
I’m doing something wrong ?

Another approach, that is also suitable for me, is to initialize a broadcast 
state manually on job startup, but I still can’t find the way hotw to do that. 

Thx for your advices.

Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

email: r.shari...@cleverdata.ru <mailto:a.totma...@cleverdata.ru>
mobile: +7 (925) 416-37-26

CleverDATA
make your data clever



Re: In-Memory state serialization with kryo fails

2019-02-15 Thread Rinat
Hi Gordon, thx for you time, will try to find other suitable serializer.

> On 13 Feb 2019, at 07:25, Tzu-Li (Gordon) Tai  wrote:
> 
> Hi,
> 
> I would suggest to avoid Kryo for state serialization, especially if this job 
> is meant for production usage.
> It might get in the way in the future when you might decide to upgrade your 
> value state schema.
> 
> To do that, when declaring the descriptor for your MapState, provide a 
> specific serializer for your value ( java.util.List[SomeClass[_]]  ).
> You should be able to use Flink's ListSerializer for this. By providing a 
> specific serializer, this bypasses Flink's type extraction for your state 
> which determines to use the KryoSerializer as a fallback for unrecognizable 
> types.
> You can find more information about custom state serialization here [1].
> 
> Cheers,
> Gordon
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/state/custom_serialization.html
>  
> <https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/state/custom_serialization.html>
> On Wed, Feb 13, 2019 at 2:56 AM Rinat  <mailto:r.shari...@cleverdata.ru>> wrote:
> Hi mates !
> 
> I’ve implemented a job, that stores it’s progress using MapState[K, V], where 
> K - is java.lang.String, and V - is a collection of some typed objects 
> java.util.List[SomeClass[_]]
> When Flink is trying to serialize this state, it is using kryo serializer for 
> value object and fails with StackOverflowException
> 
> java.lang.StackOverflowError
>   at java.util.HashMap.hash(HashMap.java:338)
>   at java.util.HashMap.get(HashMap.java:556)
>   at com.esotericsoftware.kryo.Generics.getConcreteClass(Generics.java:43)
>  
> This problem is related with the known bug in kryo 
> (https://github.com/EsotericSoftware/kryo/issues/341 
> <https://github.com/EsotericSoftware/kryo/issues/341>), and reveals itself 
> only when type of SomeClass is a java.util.BitSet. 
> 
> I’ve checked my job locally (from IDE) with latest (4.0.2 
> <https://mvnrepository.com/artifact/com.esotericsoftware/kryo/4.0.2>) kryo 
> lib, and it works fine, but I couldn’t change kryo version for distributed 
> mode, because it’s packaged into fat-jar (flink-dist_2.11-1.6.1.jar), that
> contains all runtime dependencies for Flink.
> 
> Maybe you can give me any advices, how to solve this issue, or register a 
> separate serializers for this case ?
> 
> Thx for your help.
> 
> 
> Sincerely yours,
> Rinat Sharipov
> Software Engineer at 1DMP CORE Team
> 
> email: r.shari...@cleverdata.ru <mailto:a.totma...@cleverdata.ru>
> mobile: +7 (925) 416-37-26
> 
> CleverDATA
> make your data clever
> 

Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

email: r.shari...@cleverdata.ru <mailto:a.totma...@cleverdata.ru>
mobile: +7 (925) 416-37-26

CleverDATA
make your data clever



[flink :: connected-streams :: integration-tests]

2019-02-15 Thread Rinat
Hi mates, I got some troubles with the implementation of integration tests for 
the job, based on connected streams.

It has the following logic:
I got two streams, first one is a stream of rules, and another one is a stream 
of events
to apply events on each rule, I’ve implemented a KeyedBroadcastProcessFunction 
, that broadcasts the set of rules, received from the stream
in the processBroadcastElement I'm updating the broadcast state
in the processElement method I’m evaluating all rules, from the broadcast 
state, using input event
 
I would like to implement an integration test, that will send a rule into 
pipeline and then, when it’ll be added to the broadcast state, send an event 
and check, that the output item is a result of rule evaluation.

For the test needs, I’ve replaced source functions with FromElementsFunction, 
that gives me a bounded streams with pre-defined items that will be passed over 
pipeline. 

But I couldn’t understand, how I can pass rules before sending events, maybe 
you know some practises or workarounds, how to achieve such behaviour, or maybe 
I’m doing something wrong ?

Another approach, that is also suitable for me, is to initialize a broadcast 
state manually on job startup, but I still can’t find the way hotw to do that. 

Thx for your advices.

Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

email: r.shari...@cleverdata.ru <mailto:a.totma...@cleverdata.ru>
mobile: +7 (925) 416-37-26

CleverDATA
make your data clever



In-Memory state serialization with kryo fails

2019-02-12 Thread Rinat
Hi mates !

I’ve implemented a job, that stores it’s progress using MapState[K, V], where K 
- is java.lang.String, and V - is a collection of some typed objects 
java.util.List[SomeClass[_]]
When Flink is trying to serialize this state, it is using kryo serializer for 
value object and fails with StackOverflowException

java.lang.StackOverflowError
at java.util.HashMap.hash(HashMap.java:338)
at java.util.HashMap.get(HashMap.java:556)
at com.esotericsoftware.kryo.Generics.getConcreteClass(Generics.java:43)
 
This problem is related with the known bug in kryo 
(https://github.com/EsotericSoftware/kryo/issues/341), and reveals itself only 
when type of SomeClass is a java.util.BitSet. 

I’ve checked my job locally (from IDE) with latest (4.0.2 
<https://mvnrepository.com/artifact/com.esotericsoftware/kryo/4.0.2>) kryo lib, 
and it works fine, but I couldn’t change kryo version for distributed mode, 
because it’s packaged into fat-jar (flink-dist_2.11-1.6.1.jar), that
contains all runtime dependencies for Flink.

Maybe you can give me any advices, how to solve this issue, or register a 
separate serializers for this case ?

Thx for your help.


Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

email: r.shari...@cleverdata.ru <mailto:a.totma...@cleverdata.ru>
mobile: +7 (925) 416-37-26

CleverDATA
make your data clever



[Flink 1.6.1] local app infinitely hangs

2018-10-15 Thread Rinat
e.scala:384)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
at 
org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:373)
at 
org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:410)
at 
org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
at 
org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379)
at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
at org.scalatest.WordSpecLike$class.runTests(WordSpecLike.scala:1147)
at org.scalatest.WordSpec.runTests(WordSpec.scala:1881)
at org.scalatest.Suite$class.run(Suite.scala:1147)
at 
org.scalatest.WordSpec.org$scalatest$WordSpecLike$$super$run(WordSpec.scala:1881)
at 
org.scalatest.WordSpecLike$$anonfun$run$1.apply(WordSpecLike.scala:1192)
at 
org.scalatest.WordSpecLike$$anonfun$run$1.apply(WordSpecLike.scala:1192)
at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
at org.scalatest.WordSpecLike$class.run(WordSpecLike.scala:1192)
at org.scalatest.WordSpec.run(WordSpec.scala:1881)
at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
at 
org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1.apply(Runner.scala:1340)
at 
org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1.apply(Runner.scala:1334)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1334)
at 
org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1011)
at 
org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1010)
at 
org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1500)
at 
org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1010)
at org.scalatest.tools.Runner$.run(Runner.scala:850)
at org.scalatest.tools.Runner.run(Runner.scala)
at 
org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:131)
at 
org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28)

   Locked ownable synchronizers:
- None

"VM Thread" os_prio=31 tid=0x7f9b6a01b800 nid=0x2d03 runnable 

"GC task thread#0 (ParallelGC)" os_prio=31 tid=0x7f9b6900b000 nid=0x2503 
runnable 

"GC task thread#1 (ParallelGC)" os_prio=31 tid=0x7f9b6900c000 nid=0x2703 
runnable 

"GC task thread#2 (ParallelGC)" os_prio=31 tid=0x7f9b6900c800 nid=0x2903 
runnable 

"GC task thread#3 (ParallelGC)" os_prio=31 tid=0x7f9b6900d000 nid=0x2b03 
runnable 

"VM Periodic Task Thread" os_prio=31 tid=0x7f9b68a67000 nid=0x4d03 waiting 
on condition 

JNI global references: 682


Sincerely yours,Rinat SharipovSoftware Engineer at 1DMP CORE Teamemail: r.shari...@cleverdata.rumobile: +7 (925) 416-37-26CleverDATAmake your data clever



Re: [BucketingSink] notify on moving into pending/ final state

2018-10-11 Thread Rinat
Hi Piotr, during the migration to the latest Flink version, we’ve decided to 
try to contribute this functionality to the master branch.

PR is available here https://github.com/apache/flink/pull/6824 
More details about hooking the state changes in BucketingSink are available in 
https://issues.apache.org/jira/browse/FLINK-9592 

Thx !

> On 14 Jun 2018, at 23:29, Rinat  wrote:
> 
> Hi Piotr, I’ve create an issue 
> https://issues.apache.org/jira/browse/FLINK-9592 
> <https://issues.apache.org/jira/browse/FLINK-9592>
> 
> The third proposal looks great, may I try to contribute this issue ?
> 
>> On 14 Jun 2018, at 12:29, Piotr Nowojski > <mailto:pi...@data-artisans.com>> wrote:
>> 
>> Hi,
>> 
>> Couple of things:
>> 
>> 1. Please create a Jira ticket with this proposal, so we can move discussion 
>> from user mailing list.
>> 
>> I haven’t thought it through, so take my comments with a grain of salt, 
>> however:
>> 
>> 2. If we were to go with such callback, I would prefer to have one 
>> BucketStateChangeCallback, with methods like `onInProgressToPending(…)`, 
>> `onPendingToFinal`, `onPendingToCancelled(…)`, etc, in oppose to having one 
>> interface passed three times/four times for different purposes.
>> 
>> 3. Other thing that I had in mind is that BucketingSink could be rewritten 
>> to extend TwoPhaseCommitSinkFunction. In that case, with 
>> 
>> public class BucketingSink2 extends TwoPhaseCommitSinkFunction
>> 
>> user could add his own hooks by overriding following methods
>> 
>> BucketingSink2#beginTransaction, BucketingSink2#preCommit, 
>> BucketingSink2#commit, BucketingSink2#abort. For example:
>> 
>> public class MyBucketingSink extends BucketingSink2 {
>>   @Override
>>   protected void  commit(??? txn) {
>> super.commit(txn);
>> // My hook on moving file from pending to commit state
>>   };
>> }
>> 
>> Alternatively, we could implement before mentioned callbacks support in 
>> TwoPhaseCommitSinkFunction and provide such feature to 
>> Kafka/Pravega/BucketingSink at once.
>> 
>> Piotrek
> 
> Sincerely yours,
> Rinat Sharipov
> Software Engineer at 1DMP CORE Team
> 
> email: r.shari...@cleverdata.ru <mailto:a.totma...@cleverdata.ru>
> mobile: +7 (925) 416-37-26
> 
> CleverDATA
> make your data clever
> 

Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

email: r.shari...@cleverdata.ru <mailto:a.totma...@cleverdata.ru>
mobile: +7 (925) 416-37-26

CleverDATA
make your data clever



Re: [deserialization schema] skip data, that couldn't be properly deserialized

2018-10-10 Thread Rinat
Hi Fabian, I have created the issue, 
https://issues.apache.org/jira/browse/FLINK-10525

Thx !

> On 10 Oct 2018, at 16:47, Fabian Hueske  wrote:
> 
> Hi Rinat,
> 
> Thanks for discussing this idea. Yes, I think this would be a good feature. 
> Can you open a Jira issue and describe the feature?
> 
> Thanks, Fabian
> 
> Am Do., 4. Okt. 2018 um 19:28 Uhr schrieb Rinat  <mailto:r.shari...@cleverdata.ru>>:
> Hi mates, in accordance with the contract of 
> org.apache.flink.formats.avro.DeserializationSchema, it should return null 
> value, when content couldn’t be deserialized.
> But in most cases (for example 
> org.apache.flink.formats.avro.AvroDeserializationSchema) method fails if data 
> is corrupted. 
> 
> We’ve implemented our own SerDe class, that returns null, if data doesn’t 
> satisfy avro schema, but it’s rather hard to maintain this functionality 
> during migration to the latest Flink version.
> What do you think, maybe it’ll be useful if we will support optional skip of 
> failed records in avro and other Deserializers in the source code ?
> 
> Sincerely yours,
> Rinat Sharipov
> Software Engineer at 1DMP CORE Team
> 
> email: r.shari...@cleverdata.ru <mailto:a.totma...@cleverdata.ru>
> mobile: +7 (925) 416-37-26
> 
> CleverDATA
> make your data clever
> 

Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

email: r.shari...@cleverdata.ru <mailto:a.totma...@cleverdata.ru>
mobile: +7 (925) 416-37-26

CleverDATA
make your data clever



[deserialization schema] skip data, that couldn't be properly deserialized

2018-10-04 Thread Rinat
Hi mates, in accordance with the contract of 
org.apache.flink.formats.avro.DeserializationSchema, it should return null 
value, when content couldn’t be deserialized.
But in most cases (for example 
org.apache.flink.formats.avro.AvroDeserializationSchema) method fails if data 
is corrupted. 

We’ve implemented our own SerDe class, that returns null, if data doesn’t 
satisfy avro schema, but it’s rather hard to maintain this functionality during 
migration to the latest Flink version.
What do you think, maybe it’ll be useful if we will support optional skip of 
failed records in avro and other Deserializers in the source code ?

Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

email: r.shari...@cleverdata.ru <mailto:a.totma...@cleverdata.ru>
mobile: +7 (925) 416-37-26

CleverDATA
make your data clever



Re: [BucketingSink] incorrect indexing of part files, when part suffix is specified

2018-07-06 Thread Rinat
Hi Mingey !

I’ve implemented the group of tests, that shows that problem exists only when 
part suffix is specified and file in pending state exists

here is an exception

testThatPartIndexIsIncrementedWhenPartSuffixIsSpecifiedAndPreviousPartFileInProgressState(org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkTest)
  Time elapsed: 0.018 sec  <<< ERROR!
java.io.IOException: File already exists: 
/var/folders/v9/r7ybtp9n4lj_6ybx5xnngyzmgn/T/junit8543902037302786417/junit2291904425846970077/part-0-0.my.in-progress
at 
org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:259)
at 
org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:252)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:887)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:784)
at 
org.apache.flink.streaming.connectors.fs.StreamWriterBase.open(StreamWriterBase.java:71)
at 
org.apache.flink.streaming.connectors.fs.StringWriter.open(StringWriter.java:69)
at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:587)
at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:458)
at 
org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at 
org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.processElement(OneInputStreamOperatorTestHarness.java:111)
at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkTest.testThatPartIndexIsIncremented(BucketingSinkTest.java:970)
at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkTest.testThatPartIndexIsIncrementedWhenPartSuffixIsSpecifiedAndPreviousPartFileInProgressState(BucketingSinkTest.java:909)


You could add the following test to the 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSinkTest.class

@Test//(expected = IOException.class)
   public void 
testThatPartIndexIsIncrementedWhenPartSuffixIsSpecifiedAndPreviousPartFileInProgressState()
  throws Exception {
  testThatPartIndexIsIncremented(".my", "part-0-0.my" + IN_PROGRESS_SUFFIX);
   }

   private void testThatPartIndexIsIncremented(String partSuffix, String 
existingPartFile) throws Exception {
  File outDir = tempFolder.newFolder();
  long inactivityInterval = 100;

  java.nio.file.Path bucket = Paths.get(outDir.getPath());
  Files.createFile(bucket.resolve(existingPartFile));

  String basePath = outDir.getAbsolutePath();
  BucketingSink sink = new BucketingSink(basePath)
 .setBucketer(new BasePathBucketer<>())
 .setInactiveBucketCheckInterval(inactivityInterval)
 .setInactiveBucketThreshold(inactivityInterval)
 .setPartPrefix(PART_PREFIX)
 .setInProgressPrefix("")
 .setPendingPrefix("")
 .setValidLengthPrefix("")
 .setInProgressSuffix(IN_PROGRESS_SUFFIX)
 .setPendingSuffix(PENDING_SUFFIX)
 .setValidLengthSuffix(VALID_LENGTH_SUFFIX)
 .setPartSuffix(partSuffix)
 .setBatchSize(0);

  OneInputStreamOperatorTestHarness testHarness = 
createTestSink(sink, 1, 0);
  testHarness.setup();
  testHarness.open();

  testHarness.setProcessingTime(0L);

  testHarness.processElement(new StreamRecord<>("test1", 1L));

  testHarness.setProcessingTime(101L);
  testHarness.snapshot(0, 0);
  testHarness.notifyOfCompletedCheckpoint(0);
  sink.close();

  String expectedFileName = partSuffix == null ? "part-0-1" : "part-0-1" + 
partSuffix;
//assertThat(Files.exists(bucket.resolve(expectedFileName)), is(true));
   }

And check, that test fails

it’s actual for the current master branch, also I’ve implemented a PR, that 
fixes this problem (https://github.com/apache/flink/pull/6176 
<https://github.com/apache/flink/pull/6176>)

For some reasons, I still couldn’t compile the whole flink repository, to run 
your example locally from IDE, but from my point of view, problem exists, and 
the following test shows it’s existance, please, have a look

I’m working on flink project assembly on my local machine …

Thx


> On 25 Jun 2018, at 10:44, Rinat  wrote:
> 
> Hi Mingey !
> 
> Thx for your reply, really, have no idea why everything works in your case, I 
> have implemented unit tests in my PR which shows, that problem exists. 
> Please, let me know which Flink version do you use ?
> Current fix is actual for current master branch, here it an example of unit 
> test, that shows the problem
> 
> @Test
> public void testThatPartIndexIsIncrementedWhenPartSuffixIsSpecified() throws 
> Exception {
>String partSuffix = 

Re: [BucketingSink] incorrect indexing of part files, when part suffix is specified

2018-06-25 Thread Rinat
Hi Mingey !

Thx for your reply, really, have no idea why everything works in your case, I 
have implemented unit tests in my PR which shows, that problem exists. Please, 
let me know which Flink version do you use ?
Current fix is actual for current master branch, here it an example of unit 
test, that shows the problem

@Test
public void testThatPartIndexIsIncrementedWhenPartSuffixIsSpecified() throws 
Exception {
   String partSuffix = ".my";

   File outDir = tempFolder.newFolder();
   long inactivityInterval = 100;

   java.nio.file.Path bucket = Paths.get(outDir.getPath());
   Files.createFile(bucket.resolve("part-0-0.my.pending"));

   String basePath = outDir.getAbsolutePath();
   BucketingSink sink = new BucketingSink(basePath)
  .setBucketer(new BasePathBucketer<>())
  .setInactiveBucketCheckInterval(inactivityInterval)
  .setInactiveBucketThreshold(inactivityInterval)
  .setPartPrefix(PART_PREFIX)
  .setInProgressPrefix("")
  .setPendingPrefix("")
  .setValidLengthPrefix("")
  .setInProgressSuffix(IN_PROGRESS_SUFFIX)
  .setPendingSuffix(PENDING_SUFFIX)
  .setValidLengthSuffix(VALID_LENGTH_SUFFIX)
  .setPartSuffix(partSuffix)
  .setBatchSize(0);

   OneInputStreamOperatorTestHarness testHarness = 
createTestSink(sink, 1, 0);
   testHarness.setup();
   testHarness.open();

   testHarness.setProcessingTime(0L);

   testHarness.processElement(new StreamRecord<>("test1", 1L));

   testHarness.setProcessingTime(101L);
   testHarness.snapshot(0, 0);
   testHarness.notifyOfCompletedCheckpoint(0);
   sink.close();

   assertThat(Files.exists(bucket.resolve("part-0-1")), is(true));
}

> On 24 Jun 2018, at 06:02, zhangminglei <18717838...@163.com> wrote:
> 
> Hi, Rinat
> 
> I tried this situation you said and it works fine for me. The partCounter 
> incremented as we hope. When the new part file is created, I did not see any 
> same part index. Here is my code for that, you can take a look.
> In my case, the max index of part file is part-0-683PartSuffix, other than 
> that, all still keep in _part-0-684PartSuffix.pending,  
> _part-0-685PartSuffix.pending and so on since checkpoint does not finished.
> 
> Cheers
> Minglei.
> 
> public class TestSuffix {
>public static void main(String[] args) throws Exception {
>   ParameterTool params = ParameterTool.fromArgs(args);
>   String outputPath = params.getRequired("outputPath");
> 
>   StreamExecutionEnvironment sEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> 
>   sEnv.setStateBackend(new FsStateBackend("file:///tmp/checkpoints 
> "));
>   sEnv.enableCheckpointing(200);
>   sEnv.setParallelism(1);
> 
>   BucketingSink> sink =
>  new BucketingSink Integer>>(outputPath)
> .setInactiveBucketThreshold(1000)
> .setInactiveBucketCheckInterval(1000)
> .setPartSuffix("PartSuffix")
> .setBatchSize(500);
> 
>   sEnv.addSource(new DataGenerator())
>  .keyBy(0)
>  .map(new CountUpRichMap())
>  .addSink(sink);
> 
>   sEnv.execute();
>}
> 
>public static class CountUpRichMap extends RichMapFunction String, String>, Tuple4> {
> 
>   private ValueState counter;
> 
>   @Override
>   public void open(Configuration parameters) throws Exception {
>  counter = getRuntimeContext().getState(new 
> ValueStateDescriptor<>("counter", Types.INT));
>   }
> 
>   @Override
>   public Tuple4 map(Tuple3 String, String> value) throws Exception {
>  Integer counterValue = counter.value();
>  if (counterValue == null) {
> counterValue = 0;
>  }
>  counter.update(counterValue + 1);
>  return Tuple4.of(value.f0, value.f1, value.f2, counterValue);
>   }
>}
> 
>public static class DataGenerator implements 
> SourceFunction> {
> 
>   public DataGenerator() {
>   }
> 
>   @Override
>   public void run(SourceContext> ctx) 
> throws Exception {
>  for (int i = 0; i < 1; i++) {
> ctx.collect(Tuple3.of(i % 10, UUID.randomUUID().toString(), "Some 
> payloads.."));
>  }
>   }
> 
>   @Override
>   public void cancel() {
> 
>   }
>}
> }
> 
> 
> 
> 
>> 在 2018年6月16日,下午10:21,Rinat > <mailto:r.shari...@cleverdata.ru>> 写道:
>> 
>> Hi mates, since 1.5 release, BucketingSink has ability to configure suffix 
>> of the part file. It’s very useful, when it’s necessary to set specific 
>> extens

Re: [BucketingSink] incorrect indexing of part files, when part suffix is specified

2018-06-23 Thread Rinat
Hi mates, could anyone please have a look on my PR, that fixes issue of 
incorrect indexing in BucketingSink component ?

Thx

> On 18 Jun 2018, at 10:55, Rinat  wrote:
> 
> I’ve created a JIRA issue https://issues.apache.org/jira/browse/FLINK-9603 
> <https://issues.apache.org/jira/browse/FLINK-9603> and added a proposal with 
> PR.
> 
> Thx
> 
>> On 16 Jun 2018, at 17:21, Rinat > <mailto:r.shari...@cleverdata.ru>> wrote:
>> 
>> Hi mates, since 1.5 release, BucketingSink has ability to configure suffix 
>> of the part file. It’s very useful, when it’s necessary to set specific 
>> extension of the file.
>> 
>> During the usage, I’ve found the issue - when new part file is created, it 
>> has the same part index, as index of just closed file. 
>> So, when Flink tries to move it into final state, we have a 
>> FileAlreadyExistsException.
>> 
>> This problem is related with the following code:
>> Here we are trying to find the max index of part file, that doesn’t exist in 
>> bucket directory, the problem is, that the partSuffix is not involved into 
>> path assembly. This means, that path always doesn’t exist
>> and partCounter wouldn’t be ever incremented.
>> Path partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + 
>> bucketState.partCounter);
>> while (fs.exists(partPath) ||
>>   fs.exists(getPendingPathFor(partPath)) ||
>>   fs.exists(getInProgressPathFor(partPath))) {
>>bucketState.partCounter++;
>>partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + 
>> bucketState.partCounter);
>> }
>> 
>> bucketState.creationTime = processingTimeService.getCurrentProcessingTime();
>> 
>> Before creating of writer, we appending the partSuffix here, but it should 
>> be already appended, before index checks
>> if (partSuffix != null) {
>>partPath = partPath.suffix(partSuffix);
>> }
>> I’ll create an issue and try to submit a fix
>> 
>> Sincerely yours,
>> Rinat Sharipov
>> Software Engineer at 1DMP CORE Team
>> 
>> email: r.shari...@cleverdata.ru <mailto:a.totma...@cleverdata.ru>
>> mobile: +7 (925) 416-37-26
>> 
>> CleverDATA
>> make your data clever
>> 
> 
> Sincerely yours,
> Rinat Sharipov
> Software Engineer at 1DMP CORE Team
> 
> email: r.shari...@cleverdata.ru <mailto:a.totma...@cleverdata.ru>
> mobile: +7 (925) 416-37-26
> 
> CleverDATA
> make your data clever
> 

Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

email: r.shari...@cleverdata.ru <mailto:a.totma...@cleverdata.ru>
mobile: +7 (925) 416-37-26

CleverDATA
make your data clever



Re: [BucketingSink] incorrect indexing of part files, when part suffix is specified

2018-06-18 Thread Rinat
I’ve created a JIRA issue https://issues.apache.org/jira/browse/FLINK-9603 
<https://issues.apache.org/jira/browse/FLINK-9603> and added a proposal with PR.

Thx

> On 16 Jun 2018, at 17:21, Rinat  wrote:
> 
> Hi mates, since 1.5 release, BucketingSink has ability to configure suffix of 
> the part file. It’s very useful, when it’s necessary to set specific 
> extension of the file.
> 
> During the usage, I’ve found the issue - when new part file is created, it 
> has the same part index, as index of just closed file. 
> So, when Flink tries to move it into final state, we have a 
> FileAlreadyExistsException.
> 
> This problem is related with the following code:
> Here we are trying to find the max index of part file, that doesn’t exist in 
> bucket directory, the problem is, that the partSuffix is not involved into 
> path assembly. This means, that path always doesn’t exist
> and partCounter wouldn’t be ever incremented.
> Path partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + 
> bucketState.partCounter);
> while (fs.exists(partPath) ||
>   fs.exists(getPendingPathFor(partPath)) ||
>   fs.exists(getInProgressPathFor(partPath))) {
>bucketState.partCounter++;
>partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + 
> bucketState.partCounter);
> }
> 
> bucketState.creationTime = processingTimeService.getCurrentProcessingTime();
> 
> Before creating of writer, we appending the partSuffix here, but it should be 
> already appended, before index checks
> if (partSuffix != null) {
>partPath = partPath.suffix(partSuffix);
> }
> I’ll create an issue and try to submit a fix
> 
> Sincerely yours,
> Rinat Sharipov
> Software Engineer at 1DMP CORE Team
> 
> email: r.shari...@cleverdata.ru <mailto:a.totma...@cleverdata.ru>
> mobile: +7 (925) 416-37-26
> 
> CleverDATA
> make your data clever
> 

Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

email: r.shari...@cleverdata.ru <mailto:a.totma...@cleverdata.ru>
mobile: +7 (925) 416-37-26

CleverDATA
make your data clever



[BucketingSink] incorrect indexing of part files, when part suffix is specified

2018-06-16 Thread Rinat
Hi mates, since 1.5 release, BucketingSink has ability to configure suffix of 
the part file. It’s very useful, when it’s necessary to set specific extension 
of the file.

During the usage, I’ve found the issue - when new part file is created, it has 
the same part index, as index of just closed file. 
So, when Flink tries to move it into final state, we have a 
FileAlreadyExistsException.

This problem is related with the following code:
Here we are trying to find the max index of part file, that doesn’t exist in 
bucket directory, the problem is, that the partSuffix is not involved into path 
assembly. This means, that path always doesn’t exist
and partCounter wouldn’t be ever incremented.
Path partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + 
bucketState.partCounter);
while (fs.exists(partPath) ||
  fs.exists(getPendingPathFor(partPath)) ||
  fs.exists(getInProgressPathFor(partPath))) {
   bucketState.partCounter++;
   partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + 
bucketState.partCounter);
}

bucketState.creationTime = processingTimeService.getCurrentProcessingTime();

Before creating of writer, we appending the partSuffix here, but it should be 
already appended, before index checks
if (partSuffix != null) {
   partPath = partPath.suffix(partSuffix);
}
I’ll create an issue and try to submit a fix

Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

email: r.shari...@cleverdata.ru <mailto:a.totma...@cleverdata.ru>
mobile: +7 (925) 416-37-26

CleverDATA
make your data clever



Re: [BucketingSink] notify on moving into pending/ final state

2018-06-14 Thread Rinat
Hi Piotr, I’ve create an issue https://issues.apache.org/jira/browse/FLINK-9592 
<https://issues.apache.org/jira/browse/FLINK-9592>

The third proposal looks great, may I try to contribute this issue ?

> On 14 Jun 2018, at 12:29, Piotr Nowojski  wrote:
> 
> Hi,
> 
> Couple of things:
> 
> 1. Please create a Jira ticket with this proposal, so we can move discussion 
> from user mailing list.
> 
> I haven’t thought it through, so take my comments with a grain of salt, 
> however:
> 
> 2. If we were to go with such callback, I would prefer to have one 
> BucketStateChangeCallback, with methods like `onInProgressToPending(…)`, 
> `onPendingToFinal`, `onPendingToCancelled(…)`, etc, in oppose to having one 
> interface passed three times/four times for different purposes.
> 
> 3. Other thing that I had in mind is that BucketingSink could be rewritten to 
> extend TwoPhaseCommitSinkFunction. In that case, with 
> 
> public class BucketingSink2 extends TwoPhaseCommitSinkFunction
> 
> user could add his own hooks by overriding following methods
> 
> BucketingSink2#beginTransaction, BucketingSink2#preCommit, 
> BucketingSink2#commit, BucketingSink2#abort. For example:
> 
> public class MyBucketingSink extends BucketingSink2 {
>   @Override
>   protected void  commit(??? txn) {
> super.commit(txn);
> // My hook on moving file from pending to commit state
>   };
> }
> 
> Alternatively, we could implement before mentioned callbacks support in 
> TwoPhaseCommitSinkFunction and provide such feature to 
> Kafka/Pravega/BucketingSink at once.
> 
> Piotrek

Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

email: r.shari...@cleverdata.ru <mailto:a.totma...@cleverdata.ru>
mobile: +7 (925) 416-37-26

CleverDATA
make your data clever



Re: [BucketingSink] notify on moving into pending/ final state

2018-06-13 Thread Rinat
Hi guys, thx for your reply.

The following code info is actual for release-1.5.0 tag, 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink class

For now, BucketingSink has the following lifecycle of files

When moving files from opened to pending state:
on each item (method invoke:434 line), we check that suitable bucket exist, and 
contain opened file, in case, when opened file doesn’t exist, we create one, 
and write item to it
on each item (method invoke:434 line), we check that suitable opened file 
doesn’t exceed the limits, and if limits are exceeded, we close it and move 
into pending state using closeCurrentPartFile:568 line - private method
on each timer request (onProcessingTime:482 line), we check, if items haven't 
been added to the opened file longer, than specified period of time, we close 
it, using the same private method closeCurrentPartFile:588 line

So, the only way, that we have, is to call our hook from closeCurrentPartFile, 
that is private, so we copy-pasted the current impl and injected our logic there


Files are moving from pending state into final, during checkpointing lifecycle, 
in notifyCheckpointComplete:657 line, that is public, and contains a lot of 
logic, including discovery of files in pending states, synchronization of state 
access and it’s modification, etc … 

So we couldn’t override it, or call super method and add some logic, because 
when current impl changes the state of files, it removes them from state, and 
we don’t have any opportunity to know, 
for which files state have been changed.

To solve such problem, we've created the following interface

/**
 * The {@code FileStateChangeCallback} is used to perform any additional 
operations, when {@link BucketingSink}
 * moves file from one state to another. For more information about state 
management of {@code BucketingSink}, look
 * through it's official documentation.
 */
public interface FileStateChangeCallback extends Serializable {

/**
 * Used to perform any additional operations, related with moving of file 
into next state.
 *
 * @param fs provides access for working with file system
 * @param path path to the file, moved into next state
 *
 * @throws IOException if something went wrong, while performing any 
operations with file system
 */
void call(FileSystem fs, Path path) throws IOException;
}
And have added an ability to register this callbacks in BucketingSink impl in 
the following manner

public BucketingSink 
registerOnFinalStateChangeCallback(FileStateChangeCallback… callbacks) {...}
public BucketingSink 
registerOnPendingStateChangeCallback(FileStateChangeCallback... callbacks) {...}

I’m ready to discuss the best ways, how such hooks could be implemented in the 
core impl or any other improvements, that will help us to add such 
functionality into our extension, using public api, instead of copy-pasting the 
source code.

Thx for your help, mates =)


> On 11 Jun 2018, at 11:37, Piotr Nowojski  wrote:
> 
> Hi,
> 
> I see that could be a useful feature. What exactly now is preventing you from 
> inheriting from BucketingSink? Maybe it would be just enough to make the 
> BucketingSink easier extendable.
> 
> One thing now that could collide with such feature is that Kostas is now 
> working on larger BucketingSink rework/refactor. 
> 
> Piotrek
> 
>> On 8 Jun 2018, at 16:38, Rinat > <mailto:r.shari...@cleverdata.ru>> wrote:
>> 
>> Hi mates, I got a proposal about functionality of BucketingSink.
>> 
>> During implementation of one of our tasks we got the following need - create 
>> a meta-file, with the path and additional information about the file, 
>> created by BucketingSink, when it’s been moved into final place.
>> Unfortunately such behaviour is currently not available for us. 
>> 
>> We’ve implemented our own Sink, that provides an opportunity to register 
>> notifiers, that will be called, when file state is changing, but current API 
>> doesn’t allow us to add such behaviour using inheritance ...
>> 
>> It seems, that such functionality could be useful, and could be a part of 
>> BucketingSink API
>> What do you sink, should I make a PR ?
>> 
>> Sincerely yours,
>> Rinat Sharipov
>> Software Engineer at 1DMP CORE Team
>> 
>> email: r.shari...@cleverdata.ru <mailto:a.totma...@cleverdata.ru>
>> mobile: +7 (925) 416-37-26
>> 
>> CleverDATA
>> make your data clever
>> 
> 

Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

email: r.shari...@cleverdata.ru <mailto:a.totma...@cleverdata.ru>
mobile: +7 (925) 416-37-26

CleverDATA
make your data clever



Checkpoint/ Savepoint usage

2018-06-13 Thread Rinat
Hi mates, on my way of using BucketingSink, I've decided to enable 
checkpointing, to prevent hanging of files in open state on job failure.
But it seems, that I’m not properly understood the meaning of checkpointing …

I’ve enabled the fs backend for checkpoints, and while job is working 
everything works fine, file with the state is created, and if I kill the 
taskmanager, it will be restored.
But in case, when I kill the whole job, and run it again, the state from last 
checkpoint won’t be used, and one more new state is created.

If I properly understood, checkpointing state is used by job manager, while job 
is running, and if I would like to cancel/ kill the job, I should use 
savepoints.

So I got the following questions:

are my assumptions about checkpoint/ savepoint state usage correct ?
when I’m creating a savepoint, only hdfs could be used as a backend ?
when I’m using RocksDB, it could only be used as a checkpointing backend, and 
when I’ll decide to create savepoint, it’ll be stored in hdfs ?
do we have any ability to configure the job, to use last checkpoint as a 
starting state out of the box ?

Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

email: r.shari...@cleverdata.ru <mailto:a.totma...@cleverdata.ru>
mobile: +7 (925) 416-37-26

CleverDATA
make your data clever



Re: State life-cycle for different state-backend implementations

2018-06-13 Thread Rinat
Hi Sihua, Thx for your reply

> On 9 Jun 2018, at 11:42, sihua zhou  wrote:
> 
> Hi Rinat,
> 
> I think there is one configuration {{state.checkpoints.num-retained}} to 
> control the maximum number of completed checkpoints to retain, the default 
> value is 1. So the risk you mentioned should not happen. Refer to 
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/config.html#checkpointing
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/config.html#checkpointing>
>  you could find more configurations of checkpoint.
> 
> Best, Sihua
> 
> 
> On 06/8/2018 22:55,Rinat 
> <mailto:r.shari...@cleverdata.ru> wrote: 
> Hi mates, got a question about different state backends.
> 
> As I've properly understood, on every checkpoint, Flink flushes it’s current 
> state into backend. In case of FsStateBackend we’ll have a separate file for 
> each checkpoint, and during the job lifecycle we got a risk of 
> a huge amount of state files in hdfs, that is not very cool for a hadoop 
> name-node.
> 
> Does Flink have any clean-up strategies for it’s state in different 
> implementation of backends ? If you could provide any links, where I could 
> read about more details of this process, it’ll be awesome ))
> 
> Thx a lot for your help.
> 
> Sincerely yours,
> Rinat Sharipov
> Software Engineer at 1DMP CORE Team
> 
> email: r.shari...@cleverdata.ru <mailto:a.totma...@cleverdata.ru>
> mobile: +7 (925) 416-37-26
> 
> CleverDATA
> make your data clever
> 
> 

Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

email: r.shari...@cleverdata.ru <mailto:a.totma...@cleverdata.ru>
mobile: +7 (925) 416-37-26

CleverDATA
make your data clever



State life-cycle for different state-backend implementations

2018-06-08 Thread Rinat
Hi mates, got a question about different state backends.

As I've properly understood, on every checkpoint, Flink flushes it’s current 
state into backend. In case of FsStateBackend we’ll have a separate file for 
each checkpoint, and during the job lifecycle we got a risk of 
a huge amount of state files in hdfs, that is not very cool for a hadoop 
name-node.

Does Flink have any clean-up strategies for it’s state in different 
implementation of backends ? If you could provide any links, where I could read 
about more details of this process, it’ll be awesome ))

Thx a lot for your help.

Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

email: r.shari...@cleverdata.ru <mailto:a.totma...@cleverdata.ru>
mobile: +7 (925) 416-37-26

CleverDATA
make your data clever



[BucketingSink] notify on moving into pending/ final state

2018-06-08 Thread Rinat
Hi mates, I got a proposal about functionality of BucketingSink.

During implementation of one of our tasks we got the following need - create a 
meta-file, with the path and additional information about the file, created by 
BucketingSink, when it’s been moved into final place.
Unfortunately such behaviour is currently not available for us. 

We’ve implemented our own Sink, that provides an opportunity to register 
notifiers, that will be called, when file state is changing, but current API 
doesn’t allow us to add such behaviour using inheritance ...

It seems, that such functionality could be useful, and could be a part of 
BucketingSink API
What do you sink, should I make a PR ?

Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

email: r.shari...@cleverdata.ru <mailto:a.totma...@cleverdata.ru>
mobile: +7 (925) 416-37-26

CleverDATA
make your data clever



Re: [flink-connector-filesystem] OutOfMemory in checkpointless environment

2018-06-08 Thread Rinat
Chesnay, thx for your reply, I’ve created one 
https://issues.apache.org/jira/browse/FLINK-9558 
<https://issues.apache.org/jira/browse/FLINK-9558>


> On 8 Jun 2018, at 12:58, Chesnay Schepler  wrote:
> 
> I agree, if the sink doesn't properly work without checkpointing we should 
> make sure that it fails early if it used that way.
> 
> It would be great if you could open a JIRA.
> 
> On 08.06.2018 10:08, Rinat wrote:
>> Piotr, thx for your reply, for now everything is pretty clear. But from my 
>> point of view, it’s better to add some information about leaks in case of 
>> disabled checkpointing into BucketingSink documentation
>> 
>>> On 8 Jun 2018, at 10:35, Piotr Nowojski >> <mailto:pi...@data-artisans.com>> wrote:
>>> 
>>> Hi,
>>> 
>>> BucketingSink is designed to provide exactly-once writes to file system, 
>>> which is inherently tied to checkpointing. As you just saw, without 
>>> checkpointing, BucketingSink is never notified that it can commit pending 
>>> files. 
>>> 
>>> If you do not want to use checkpointing for some reasons, you could always 
>>> use for example 
>>> org.apache.flink.streaming.api.datastream.DataStream#writeUsingOutputFormat 
>>> and write your own simple `OutputFormat` or look if   one 
>>> of the existing ones meet your needs.
>>> 
>>> Piotrek
>>> 
>>>> On 7 Jun 2018, at 14:23, Rinat >>> <mailto:r.shari...@cleverdata.ru>> wrote:
>>>> 
>>>> Hi mates, we got some Flink jobs, that are writing data from kafka into 
>>>> hdfs, using Bucketing-Sink.
>>>> For some reasons, those jobs are running without checkpointing. For now, 
>>>> it not a big problem for us, if some files are remained opened in case of 
>>>> job reloading.
>>>> 
>>>> Periodically, those jobs fail with OutOfMemory exception, and seems, that 
>>>> I found a strange thing in the implementation of BucketingSink.
>>>> 
>>>> During the sink lifecycle, we have a state object, implemented as a map, 
>>>> where key is a bucket path, and value is a state, that contains 
>>>> information about opened files and list of pending files.
>>>> After researching of the heap dump, I found, that those state stores 
>>>> information about ~ 1_000 buckets and their state, all this stuff weights 
>>>> ~ 120 Mb.
>>>> 
>>>> I’ve looked through the code, and found, that we removing the buckets from 
>>>> the state, in notifyCheckpointComplete method. 
>>>> 
>>>> @Override
>>>> public void notifyCheckpointComplete(long checkpointId) throws Exception {
>>>>   Iterator>> bucketStatesIt = 
>>>> state.bucketStates.entrySet().iterator();
>>>>   while (bucketStatesIt.hasNext()) {
>>>>if (!bucketState.isWriterOpen &&
>>>>bucketState.pendingFiles.isEmpty() &&
>>>>bucketState.pendingFilesPerCheckpoint.isEmpty()) {
>>>> 
>>>>// We've dealt with all the pending files and the writer for this 
>>>> bucket is not currently open.
>>>>// Therefore this bucket is currently inactive and we can remove it 
>>>> from our state.
>>>>bucketStatesIt.remove();
>>>> }
>>>> }
>>>> }
>>>> 
>>>> So, this looks like an issue, when you are using this sink in 
>>>> checkpointless environment, because the data always added to the state, 
>>>> but never removed.
>>>> Of course, we could enabled checkpointing, and use one of available 
>>>> backends, but as for me, it seems like a non expected behaviour, like I 
>>>> have an opportunity to run the job without checkpointing, but really, if I 
>>>> do so,
>>>> I got an exception in sink component.
>>>> 
>>>> What do you think about this ? Do anyone got the same problem, and how’ve 
>>>> you solved it ?
>>>> 
>>>> Sincerely yours,
>>>> Rinat Sharipov
>>>> Software Engineer at 1DMP CORE Team
>>>> 
>>>> email: r.shari...@cleverdata.ru <mailto:a.totma...@cleverdata.ru>
>>>> mobile: +7 (925) 416-37-26
>>>> 
>>>> CleverDATA
>>>> make your data clever
>>>> 
>>> 
>> 
>> Sincerely yours,
>> Rinat Sharipov
>> Software Engineer at 1DMP CORE Team
>> 
>> email: r.shari...@cleverdata.ru <mailto:a.totma...@cleverdata.ru>
>> mobile: +7 (925) 416-37-26
>> 
>> CleverDATA
>> make your data clever
>> 
> 

Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

email: r.shari...@cleverdata.ru <mailto:a.totma...@cleverdata.ru>
mobile: +7 (925) 416-37-26

CleverDATA
make your data clever



Re: [flink-connector-filesystem] OutOfMemory in checkpointless environment

2018-06-08 Thread Rinat
Piotr, thx for your reply, for now everything is pretty clear. But from my 
point of view, it’s better to add some information about leaks in case of 
disabled checkpointing into BucketingSink documentation

> On 8 Jun 2018, at 10:35, Piotr Nowojski  wrote:
> 
> Hi,
> 
> BucketingSink is designed to provide exactly-once writes to file system, 
> which is inherently tied to checkpointing. As you just saw, without 
> checkpointing, BucketingSink is never notified that it can commit pending 
> files. 
> 
> If you do not want to use checkpointing for some reasons, you could always 
> use for example 
> org.apache.flink.streaming.api.datastream.DataStream#writeUsingOutputFormat 
> and write your own simple `OutputFormat` or look if one of the existing ones 
> meet your needs.
> 
> Piotrek
> 
>> On 7 Jun 2018, at 14:23, Rinat > <mailto:r.shari...@cleverdata.ru>> wrote:
>> 
>> Hi mates, we got some Flink jobs, that are writing data from kafka into 
>> hdfs, using Bucketing-Sink.
>> For some reasons, those jobs are running without checkpointing. For now, it 
>> not a big problem for us, if some files are remained opened in case of job 
>> reloading.
>> 
>> Periodically, those jobs fail with OutOfMemory exception, and seems, that I 
>> found a strange thing in the implementation of BucketingSink.
>> 
>> During the sink lifecycle, we have a state object, implemented as a map, 
>> where key is a bucket path, and value is a state, that contains information 
>> about opened files and list of pending files.
>> After researching of the heap dump, I found, that those state stores 
>> information about ~ 1_000 buckets and their state, all this stuff weights ~ 
>> 120 Mb.
>> 
>> I’ve looked through the code, and found, that we removing the buckets from 
>> the state, in notifyCheckpointComplete method. 
>> 
>> @Override
>> public void notifyCheckpointComplete(long checkpointId) throws Exception {
>>   Iterator>> bucketStatesIt = 
>> state.bucketStates.entrySet().iterator();
>>   while (bucketStatesIt.hasNext()) {
>>if (!bucketState.isWriterOpen &&
>>bucketState.pendingFiles.isEmpty() &&
>>bucketState.pendingFilesPerCheckpoint.isEmpty()) {
>> 
>>// We've dealt with all the pending files and the writer for this 
>> bucket is not currently open.
>>// Therefore this bucket is currently inactive and we can remove it 
>> from our state.
>>bucketStatesIt.remove();
>> }
>> }
>> }
>> 
>> So, this looks like an issue, when you are using this sink in checkpointless 
>> environment, because the data always added to the state, but never removed.
>> Of course, we could enabled checkpointing, and use one of available 
>> backends, but as for me, it seems like a non expected behaviour, like I have 
>> an opportunity to run the job without checkpointing, but really, if I do so,
>> I got an exception in sink component.
>> 
>> What do you think about this ? Do anyone got the same problem, and how’ve 
>> you solved it ?
>> 
>> Sincerely yours,
>> Rinat Sharipov
>> Software Engineer at 1DMP CORE Team
>> 
>> email: r.shari...@cleverdata.ru <mailto:a.totma...@cleverdata.ru>
>> mobile: +7 (925) 416-37-26
>> 
>> CleverDATA
>> make your data clever
>> 
> 

Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

email: r.shari...@cleverdata.ru <mailto:a.totma...@cleverdata.ru>
mobile: +7 (925) 416-37-26

CleverDATA
make your data clever



[flink-connector-filesystem] OutOfMemory in checkpointless environment

2018-06-07 Thread Rinat
Hi mates, we got some Flink jobs, that are writing data from kafka into hdfs, 
using Bucketing-Sink.
For some reasons, those jobs are running without checkpointing. For now, it not 
a big problem for us, if some files are remained opened in case of job 
reloading.

Periodically, those jobs fail with OutOfMemory exception, and seems, that I 
found a strange thing in the implementation of BucketingSink.

During the sink lifecycle, we have a state object, implemented as a map, where 
key is a bucket path, and value is a state, that contains information about 
opened files and list of pending files.
After researching of the heap dump, I found, that those state stores 
information about ~ 1_000 buckets and their state, all this stuff weights ~ 120 
Mb.

I’ve looked through the code, and found, that we removing the buckets from the 
state, in notifyCheckpointComplete method. 

@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
  Iterator>> bucketStatesIt = 
state.bucketStates.entrySet().iterator();
  while (bucketStatesIt.hasNext()) {
   if (!bucketState.isWriterOpen &&
   bucketState.pendingFiles.isEmpty() &&
   bucketState.pendingFilesPerCheckpoint.isEmpty()) {

   // We've dealt with all the pending files and the writer for this bucket 
is not currently open.
   // Therefore this bucket is currently inactive and we can remove it from 
our state.
   bucketStatesIt.remove();
}
}
}

So, this looks like an issue, when you are using this sink in checkpointless 
environment, because the data always added to the state, but never removed.
Of course, we could enabled checkpointing, and use one of available backends, 
but as for me, it seems like a non expected behaviour, like I have an 
opportunity to run the job without checkpointing, but really, if I do so,
I got an exception in sink component.

What do you think about this ? Do anyone got the same problem, and how’ve you 
solved it ?

Sincerely yours,
Rinat Sharipov
Software Engineer at 1DMP CORE Team

email: r.shari...@cleverdata.ru <mailto:a.totma...@cleverdata.ru>
mobile: +7 (925) 416-37-26

CleverDATA
make your data clever



Re: Flink send checkpointing message in IT

2017-11-07 Thread Rinat
Yes, but  notifyCheckpointComplete callback doesn’t called on await completion, 
I do the same, as in specified test template :

ActorGateway jobManager = (ActorGateway) 
Await.result(cluster.leaderGateway().future(), DEADLINE.timeLeft());
Future savepointResultFuture = jobManager.ask(new 
JobManagerMessages.TriggerSavepoint(
jobId, Option.empty()), DEADLINE.timeLeft()
);
while(!savepointResultFuture.isCompleted()) {
System.out.println();
}
Object savepointResult = Await.result(savepointResultFuture, 
DEADLINE.timeLeft());

if (savepointResult instanceof 
JobManagerMessages.TriggerSavepointFailure) {
throw new RuntimeException(String.format("Something went wrong 
while executing savepoint, [message=%s]",
((JobManagerMessages.TriggerSavepointFailure) 
savepointResult).cause()
));
}

Thx

> On 7 Nov 2017, at 13:54, Chesnay Schepler <ches...@apache.org> wrote:
> 
> Do you verify that savepointResult is a 
> JobManagerMessages.TriggerSavepointSuccess? It could also be 
> JobManagerMessages.TriggerSavepointFailure. (instanceof check)
> 
> On 02.11.2017 19:11, Rinat wrote:
>> Chesnay, thanks for your reply, it was very helpful, but I took logic from 
>> this test template and tried to reuse it in my IT case, but found one more 
>> issue.
>> I’ve registered an accumulator in my source function, and for it’s value, as 
>> specified in the specified example.
>> When accumulator has an expected value, I perform a savepoint and wait for 
>> it’s completion using the further code
>> 
>> ActorGateway jobManager = (ActorGateway) 
>> Await.result(cluster.leaderGateway().future(), DEADLINE.timeLeft());
>> Future savepointResultFuture = jobManager.ask(new 
>> JobManagerMessages.TriggerSavepoint(
>> jobId, Option.empty()), DEADLINE.timeLeft()
>> );
>> Object savepointResult = Await.result(savepointResultFuture, 
>> DEADLINE.timeLeft());
>> Afterwards, if failures haven’t been detected I cancels my job and shutdowns 
>> cluster.
>> 
>> I found, that checkpoint method notifyCheckpointComplete not always called, 
>> before the savepointResult is ready. So the part of my logic, that lives in 
>> implementation of this method doesn’t work and test fails.
>> 
>> So could you or someone explain, does Flink guaranties, that 
>> notifyCheckpointComplete method will be called before savepointResult  will 
>> become accessable.
>> For me, it’s rather strange behaviour and I think that I’m doing something 
>> wrong.
>> 
>> Thx.
>> 
>>> On 1 Nov 2017, at 14:26, Chesnay Schepler <ches...@apache.org 
>>> <mailto:ches...@apache.org>> wrote:
>>> 
>>> You could trigger a savepoint, which from the viewpoint of 
>>> sources/operators/sinks is the same thing as a checkpoint.
>>> 
>>> How to do this depends a bit on how your test case is written, but you can 
>>> take a look at the SavepointMigrationTestBase#executeAndSavepoint which is 
>>> all about running josb and triggering
>>> savepoints once certain conditions have been met.
>>> 
>>> On 30.10.2017 16:01, Rinat wrote:
>>>> Hi guys, I’ve got a question about working with checkpointing.
>>>> I would like to implement IT test, where source is a fixed collection of 
>>>> items and sink performs additional logic, when checkpointing is completed.
>>>> 
>>>> I would like to force executing checkpointing, when all messages from my 
>>>> test source were sent and processed by sink.
>>>> Please tell me, whether such logic could be performed or not, and how.
>>>> 
>>>> Thx !
>>> 
>>> 
>> 
> 



Re: Flink send checkpointing message in IT

2017-11-02 Thread Rinat
Chesnay, thanks for your reply, it was very helpful, but I took logic from this 
test template and tried to reuse it in my IT case, but found one more issue.
I’ve registered an accumulator in my source function, and for it’s value, as 
specified in the specified example.
When accumulator has an expected value, I perform a savepoint and wait for it’s 
completion using the further code

ActorGateway jobManager = (ActorGateway) 
Await.result(cluster.leaderGateway().future(), DEADLINE.timeLeft());
Future savepointResultFuture = jobManager.ask(new 
JobManagerMessages.TriggerSavepoint(
jobId, Option.empty()), DEADLINE.timeLeft()
);
Object savepointResult = Await.result(savepointResultFuture, 
DEADLINE.timeLeft());
Afterwards, if failures haven’t been detected I cancels my job and shutdowns 
cluster.

I found, that checkpoint method notifyCheckpointComplete not always called, 
before the savepointResult is ready. So the part of my logic, that lives in 
implementation of this method doesn’t work and test fails.

So could you or someone explain, does Flink guaranties, that 
notifyCheckpointComplete method will be called before savepointResult  will 
become accessable.
For me, it’s rather strange behaviour and I think that I’m doing something 
wrong.

Thx.

> On 1 Nov 2017, at 14:26, Chesnay Schepler <ches...@apache.org> wrote:
> 
> You could trigger a savepoint, which from the viewpoint of 
> sources/operators/sinks is the same thing as a checkpoint.
> 
> How to do this depends a bit on how your test case is written, but you can 
> take a look at the SavepointMigrationTestBase#executeAndSavepoint which is 
> all about running josb and triggering
> savepoints once certain conditions have been met.
> 
> On 30.10.2017 16:01, Rinat wrote:
>> Hi guys, I’ve got a question about working with checkpointing.
>> I would like to implement IT test, where source is a fixed collection of 
>> items and sink performs additional logic, when checkpointing is completed.
>> 
>> I would like to force executing checkpointing, when all messages from my 
>> test source were sent and processed by sink.
>> Please tell me, whether such logic could be performed or not, and how.
>> 
>> Thx !
> 
> 



How to lock and wait, until checkpointing is completed

2017-10-30 Thread Rinat
Hi guys, got one more question for you, maybe someone already implemented such 
feature or found a good technique.

I wrote an IT, that runs a flink job, that reads data from kafka topic, and 
flushes it onto fs using BucketingSink.
I implemented some custom logic, that fires on notifyCheckpointComplete and 
would like to test it, so I need to lock job somehow and wait till 
checkpointing is performed.

The first idea, that I’ve implemented, is to specify checkpointing interval in 
a 1 second, and extend logic of test source to wait for a few seconds, when all 
test messages will be send to sink.
The code looks something like this:
public class SleepingCollectionInputFormat extends CollectionInputFormat {

private static final long serialVersionUID = -5957191172818298164L;

private final long duration;

public SleepingCollectionInputFormat(Collection dataSet, 
TypeSerializer serializer, long duration) {
super(dataSet, serializer);
this.duration = duration;
}

@Override
public boolean reachedEnd() throws IOException {
try {
boolean reached = super.reachedEnd();
if (reached) {
Thread.sleep(duration);
}
return reached;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
This is not good enough, because we couldn’t provide any guarantees, that 
everything is properly locked and checkpointing is be called.
As for me, much better approach will be to send some kind of notification to 
task manager, when all items from source were sent, that it’s time to perform 
checkpointing.
Maybe someone knows, how such feature could be implemented ?

Another one thing, that could be implemented is to use CountDownLatch, count on 
notify checkpointing and await in the end of source function, but serialization 
makes it to complicated for me now.

I’ll be very pleasant for your replies, answers and recommendations.

Thx ! 

Flink send checkpointing message in IT

2017-10-30 Thread Rinat
Hi guys, I’ve got a question about working with checkpointing. 
I would like to implement IT test, where source is a fixed collection of items 
and sink performs additional logic, when checkpointing is completed.

I would like to force executing checkpointing, when all messages from my test 
source were sent and processed by sink.
Please tell me, whether such logic could be performed or not, and how.

Thx !

Re: How to test new sink

2017-10-23 Thread Rinat
Timo, thx for your reply.
I’m using gradle instead of maven, but I’ll look through the existing similar 
plugins for it.

I don’t think, that sharing of external tests between other projects is a good 
idea, but it’s out of scope of current discussion. 
The main purpose of my request is to understand the existing best practices for 
testing functions, that uses TimeService and after this, if it’ll be necessary, 
create an issue with proposals.

Thx.

> On 23 Oct 2017, at 17:51, Timo Walther <twal...@apache.org> wrote:
> 
> Hi Rinat,
> 
> using one of the Flink test utilities is a good approach to test your custom 
> operators. But of course these classes might change in the future. 
> 
> First of all, Flink is a open source project so you can just copy the 
> required classes. However, it should be possible to use the Flink test 
> resources like explained here [1]. Flink adds the `test-jar` goal.
> 
> @Gary: You are working on the BucketingSink right now, right? Do you have a 
> suggestion?
> 
> Regards,
> Timo
> 
> 
> [1] 
> https://stackoverflow.com/questions/29653914/how-can-i-reference-unit-test-classes-of-a-maven-dependency-in-my-java-project
>  
> <https://stackoverflow.com/questions/29653914/how-can-i-reference-unit-test-classes-of-a-maven-dependency-in-my-java-project>
> 
> 
> Am 10/23/17 um 3:16 PM schrieb Rinat:
>> Hi !!!
>> 
>> I’ve just implemented a new sink, that extends functionality of existing 
>> BucketingSink, currently I’m trying to test functionality, that is related 
>> with timing.
>> My sink implements ProcessingTimeCallback, similarly with the original 
>> BucketingSink. I’m trying to inject TestProcessingTimeService to test 
>> 
>> I discovered, that original tests using some kind of test templates for 
>> testing functions OneInputStreamOperatorTestHarness, extending which test 
>> impl of ProcessingTimeService is injected, 
>> but this templates are in test scope of flink-streaming-java module, so it 
>> could’t be accessed by the external projects.
>> 
>> May be someone could help me and explain the better approach for testing 
>> sink functionality ?
>> 
>> Thx !
>> 
>> 
> 



How to test new sink

2017-10-23 Thread Rinat
Hi !!!

I’ve just implemented a new sink, that extends functionality of existing 
BucketingSink, currently I’m trying to test functionality, that is related with 
timing.
My sink implements ProcessingTimeCallback, similarly with the original 
BucketingSink. I’m trying to inject TestProcessingTimeService to test 

I discovered, that original tests using some kind of test templates for testing 
functions OneInputStreamOperatorTestHarness, extending which test impl of 
ProcessingTimeService is injected, 
but this templates are in test scope of flink-streaming-java module, so it 
could’t be accessed by the external projects.

May be someone could help me and explain the better approach for testing sink 
functionality ?

Thx !




BucketingSink with disabled checkpointing will never clean up it's state

2017-10-20 Thread Rinat
Hi, got one more little question about BucketingSink with disabled 
checkpointing.
In terms of my current task, I’m looking through sources of BucketingSink and 
it seem’s that I found an issue for the case, when checkpointing is disabled.

BucketingSink - is a flink rich function, that also implements checkpointing 
interface, so, it stores all metadata about open writers in it’s state.
On invoke method call, we creates new writers, if it’s necessary to open file 
with a new path, and on writer creation we adds it to the state meta info. 

Later, when file size exceeds the configured limit or timer tells us, that open 
writers should be closed, sink closes writers, moves file into pending state, 
and updates it’s state by adding file name of closed writer
to the collection of pending files.

Later, if checkpointing is enabled, sink will be notified, and it’ll move all 
pending files from pending to final state and clean up it’s current state.
But, what happens, if checkpointing is disabled ? 

I looked through the code base of BucketingSink, seems to me, the state will be 
never cleaned up and number of managed files will always grow, when 
checkpointing is disabled.
I could provide more information if necessary, currently, my suspicions are 
based only on the code base research

Do anyone uses BucketingSink with disabled checkpointing in prod environment ?

Thx.




Re: Flink BucketingSink, subscribe on moving of file into final state

2017-10-20 Thread Rinat
Piotrek, thanks for your reply.

Yes, now I’m looking for the most suitable way to extend BucketingSink 
functionality, to handle moments of moving the file into final state.
I thought, that maybe someone has already implemented such thing or knows any 
other approaches that will help me to not copy/ paste existing sink impl ))

Thx !


> On 20 Oct 2017, at 14:37, Piotr Nowojski  wrote:
> 
> Piotrek



Flink BucketingSink, subscribe on moving of file into final state

2017-10-20 Thread Rinat
Hi All !

I’m trying to create a meta-info file, that contains link to file, created by 
Flink BucketingSink.
At first I was trying to implement my own 
org.apache.flink.streaming.connectors.fs.Writer, that creates a meta-file on 
close method call. 
But I understood, that it’s not completely right, because when writer is 
closed, file, into which data were written, is in in-progress state and in 
final state it will change it’s name. 
So create any meta-info on writer closing, that links to the in-progress file, 
will lead my system to inconsistent state.

I looked through the sources of BucketingSink, and have not found an elegant 
way to perform any kind of subscription on moving file with data into final 
state.
Maybe someone already had the same issue and found elegant way how it could be 
solved ?

Also maybe someone know how this issue could be solved using other Flink tools/ 
components, because I'm not so long using Flink and maybe don’t know some of 
it's features.

Thx.

Flink Job Deployment (Not enough resources)

2017-09-04 Thread Rinat
Hi everyone, I’ve got the following problem, when I’m trying to submit new job 
and if cluster has not enough resources, job submission fails with the 
following exception
But in YARN job hangs and wait’s for requested resources. When resources become 
available, job successfully runs.

What can I do to be sure that job startup is completed successfully or 
completely failed  ?

Thx.

The program finished with the following 
exception:\n\njava.lang.RuntimeException: Unable to tell application master to 
stop once the specified job has been finised\n\tat 
org.apache.flink.yarn.YarnClusterClient.stopAfterJob(YarnClusterClient.java:177)\n\tat
 
org.apache.flink.yarn.YarnClusterClient.submitJob(YarnClusterClient.java:201)\n\tat
 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:442)\n\tat 
org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:76)\n\tat
 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:387)\n\tat 
org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:838)\n\tat 
org.apache.flink.client.CliFrontend.run(CliFrontend.java:259)\n\tat 
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1086)\n\tat
 org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1133)\n\tat 
org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1130)\n\tat 
org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)\n\tat
 java.security.AccessController.doPrivileged(Native Method)\n\tat 
javax.security.auth.Subject.doAs(Subject.java:422)\n\tat 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1656)\n\tat
 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)\n\tat
 org.apache.flink.client.CliFrontend.main(CliFrontend.java:1129)\nCaused by: 
org.apache.flink.util.FlinkException: Could not connect to the leading 
JobManager. Please check that the JobManager is running.\n\tat 
org.apache.flink.client.program.ClusterClient.getJobManagerGateway(ClusterClient.java:789)\n\tat
 
org.apache.flink.yarn.YarnClusterClient.stopAfterJob(YarnClusterClient.java:171)\n\t...
 15 more\nCaused by: 
org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException: Could not 
retrieve the leader gateway.\n\tat 
org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderGateway(LeaderRetrievalUtils.java:79)\n\tat
 
org.apache.flink.client.program.ClusterClient.getJobManagerGateway(ClusterClient.java:784)\n\t...
 16 more\nCaused by: java.util.concurrent.TimeoutException: Futures timed out 
after [1 milliseconds]\n\tat 
scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)\n\tat 
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)\n\tat 
scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)\n\tat 
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)\n\tat
 scala.concurrent.Await$.result(package.scala:190)\n\tat 
scala.concurrent.Await.result(package.scala)\n\tat 
org.apache.flink.runtime.util.LeaderRetrievalUtils.retrieveLeaderGateway(LeaderRetrievalUtils.java:77)\n\t...
 17 more", "stderr_lines": ["", 
"", " The program 
finished with the following exception:", "", "java.lang.RuntimeException: 
Unable to tell application master to stop once the specified job has been 
finised", "\tat 
org.apache.flink.yarn.YarnClusterClient.stopAfterJob(YarnClusterClient.java:177)",
 "\tat 
org.apache.flink.yarn.YarnClusterClient.submitJob(YarnClusterClient.java:201)", 
"\tat 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:442)", 
"\tat 
org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:76)",
 "\tat 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:387)", 
"\tat 
org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:838)", 
"\tat org.apache.flink.client.CliFrontend.run(CliFrontend.java:259)", "\tat 
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1086)", 
"\tat org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1133)", "\tat 
org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1130)", "\tat 
org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)",
 "\tat java.security.AccessController.doPrivileged(Native Method)", "\tat 
javax.security.auth.Subject.doAs(Subject.java:422)", "\tat 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1656)",
 "\tat 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)",
 "\tat org.apache.flink.client.CliFrontend.main(CliFrontend.java:1129)", 
"Caused by: org.apache.flink.util.FlinkException: Could not connect to the 
leading JobManager. Please check that the JobManager is running.", "\tat 

Flink Job Deployment

2017-09-04 Thread Rinat
Hi folks ! 
I’ve got a question about running flink job on the top of YARN. 
Is there any possibility to store job sources in hdfs, for example

/app/flink/job-name/ 
  - /lib/*.jar
  - /etc/*.properties

and specify directories, that should be added to the job classpath ?

Thx.





Re: write into hdfs using avro

2017-07-27 Thread Rinat
Hi Gordeon, Thx for your reply, already implemented ;)

> On 27 Jul 2017, at 12:57, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote:
> 
> Hi!
> 
> Yes, you can provide a custom writer for the BucketingSink via 
> BucketingSink#setWriter(…).
> The AvroKeyValueSinkWriter is a simple example of a writer that uses Avro for 
> serialization, and takes as input KV 2-tuples.
> If you want to have a writer that takes as input your own event types, AFAIK 
> you’ll need to implement your own Writer.
> 
> Cheers,
> Gordon
> 
> On 21 July 2017 at 7:31:21 PM, Rinat (r.shari...@cleverdata.ru 
> <mailto:r.shari...@cleverdata.ru>) wrote:
> 
>> Hi, folks !
>> 
>> I’ve got a little question, I’m trying to save stream of events from Kafka 
>> into HDSF using 
>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink with AVRO 
>> serialization. 
>> If I properly understood, I should use some implementation of 
>> org.apache.flink.streaming.connectors.fs.Writer for this purposes.
>> 
>> I found an existing implementation of avro writer 
>> org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter<K, V>, but 
>> my stream contains only value. 
>> What I need to do, if I want to write values from stream using a 
>> BucketingSing in avro format ?
>> 
>> Thx.



write into hdfs using avro

2017-07-21 Thread Rinat
Hi, folks !

I’ve got a little question, I’m trying to save stream of events from Kafka into 
HDSF using org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink 
with AVRO serialization. 
If I properly understood, I should use some implementation of 
org.apache.flink.streaming.connectors.fs.Writer for this purposes.

I found an existing implementation of avro writer 
org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter, but my 
stream contains only value. 
What I need to do, if I want to write values from stream using a BucketingSing 
in avro format ?

Thx.