Re: Sink - writeAsText problem

2016-05-03 Thread Fabian Hueske
Yes, but be aware that your program runs with parallelism 1 if you do not
configure the parallelism.

2016-05-03 11:07 GMT+02:00 Punit Naik :

> Hi Stephen, Fabian
>
> setting "fs.output.always-create-directory" to true in flink-config.yml
> worked!
>
> On Tue, May 3, 2016 at 2:27 PM, Stephan Ewen  wrote:
>
>> Hi!
>>
>> There is the option to always create a directory:
>> "fs.output.always-create-directory"
>>
>> See
>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/config.html#file-systems
>>
>> Greetings,
>> Stephan
>>
>>
>> On Tue, May 3, 2016 at 9:26 AM, Punit Naik 
>> wrote:
>>
>>> Hello
>>>
>>> I executed my Flink code in eclipse and it properly generated the output
>>> by creating a folder (as specified in the string) and placing output files
>>> in them.
>>>
>>> But when I exported the project as JAR and ran the same code using
>>> ./flink run, it generated the output, but instead of creating a folder with
>>> files in it, it just created a single file (as specified in the string)
>>> (output was correct though).
>>>
>>> Why does this happen? I want Flink to write its output in folder.
>>>
>>> --
>>> Thank You
>>>
>>> Regards
>>>
>>> Punit Naik
>>>
>>
>>
>
>
> --
> Thank You
>
> Regards
>
> Punit Naik
>


Re: Creating a custom operator

2016-05-03 Thread Fabian Hueske
Hi Simone,

you are right, the interfaces you extend are not considered to be public,
user-facing API.
Adding custom operators to the DataSet API touches many parts of the system
and is not straightforward.
The DataStream API has better support for custom operators.

Can you explain what kind of operator you would like to add?
Maybe the functionality can be achieved with the existing operators.

Best, Fabian

2016-05-03 12:54 GMT+02:00 Simone Robutti :

> Hello Fabian,
>
> we delved more moving from the input you gave us but a question arised. We
> always assumed that runtime operators were open for extension without
> modifying anything inside Flink but it looks like this is not the case and
> the documentation assumes that the developer is working to a contribution
> to Flink. So I would like to know if our understandment is correct and
> custom runtime operators are not supposed to be implemented outside of
> Flink.
>
> Thanks,
>
> Simone
>
> 2016-04-29 21:32 GMT+02:00 Fabian Hueske :
>
>> Hi Simone,
>>
>> the GraphCreatingVisitor transforms the common operator plan into a
>> representation that is translated by the optimizer.
>> You have to implement an OptimizerNode and OperatorDescriptor to describe
>> the operator.
>> Depending on the semantics of the operator, there are a few more places
>> to make the integration working like driver strategies, cost model, etc.
>>
>> I would recommend to have a look at previous changes that added an
>> operator such as PartitionOperator, SortPartitionOperator, OuterJoin, etc.
>> The respective commits should give you an idea which parts of the code
>> need to be touched. You should find the commit IDs in the JIRA issues for
>> these extensions.
>>
>> Cheers, Fabian
>>
>>
>>
>>
>>
>> 2016-04-29 15:32 GMT+02:00 Simone Robutti :
>>
>>> Hello,
>>>
>>> I'm trying to create a custom operator to explore the internals of
>>> Flink. Actually the one I'm working on is rather similar to Union and I'm
>>> trying to mimick it for now. When I run my job though, this error arise:
>>>
>>> Exception in thread "main" java.lang.IllegalArgumentException: Unknown
>>> operator type: MyOperator - My Operator
>>> at
>>> org.apache.flink.optimizer.traversals.GraphCreatingVisitor.preVisit(GraphCreatingVisitor.java:237)
>>> at
>>> org.apache.flink.optimizer.traversals.GraphCreatingVisitor.preVisit(GraphCreatingVisitor.java:82)
>>> at
>>> org.apache.flink.api.common.operators.DualInputOperator.accept(DualInputOperator.java:279)
>>> at
>>> org.apache.flink.api.common.operators.GenericDataSinkBase.accept(GenericDataSinkBase.java:223)
>>> at org.apache.flink.api.common.Plan.accept(Plan.java:348)
>>> at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:454)
>>> at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:398)
>>> at
>>> org.apache.flink.client.LocalExecutor.getOptimizerPlanAsJSON(LocalExecutor.java:213)
>>> at
>>> org.apache.flink.api.java.LocalEnvironment.getExecutionPlan(LocalEnvironment.java:107)
>>> at io.radicalbit.flinkh2o.Job$.main(Job.scala:50)
>>> at io.radicalbit.flinkh2o.Job.main(Job.scala)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
>>>
>>> I looked at the location of the error but it's not clear to me how to
>>> make my operator recognizable from the optimizer.
>>>
>>> Thank,
>>>
>>> Simone
>>>
>>
>>
>


Re: Discussion about a Flink DataSource repository

2016-05-04 Thread Fabian Hueske
Hi Flavio,

I thought a bit about your proposal. I am not sure if it is actually
necessary to integrate a central source repository into Flink. It should be
possible to offer this as an external service which is based on the
recently added TableSource interface. TableSources could be extended to be
able to serialize and descerialize their configuration to/from JSON. When
the external repository service starts, it can read the JSON fields and
instantiate and register TableSource objectes. The repository could also
hold metadata about the sources and serve a (web) UI to list available
source. When a Flink program wants to access a data source which is
registered in the repository, it could lookup the respective TableSouce
object from the repository.

Given that an integration of metadata with Flink user functions (point 2.
in your proposal) is a very special requirement, I am not sure how much
"native" support should be added to Flink. Would it be possible to add a
lineage tag to each record and ship the metadata of all sources as
broadcast set to each operator? Then user functions could lookup the
metadata from the broadcast set.

Best, Fabian

2016-04-29 12:49 GMT+02:00 Flavio Pompermaier :

> Hi to all,
>
> as discussed briefly with Fabian, for our products in Okkam we need a
> central repository of DataSources processed by Flink.
> With respect to existing external catalogs, such as Hive or Confluent's
> SchemaRegistry, whose objective is to provide necessary metadata to
> read/write the registered tables, we would also need a way to acess to
> other general metadata (e.g. name, description, creator, creation date,
> lastUpdate date, processedRecords, certificationLevel of provided data,
> provenance, language, etc).
>
> This integration has 2 main goals:
>
>1. In a UI: to enable the user to choose (or even create) a datasource
>to process with some task (e.g. quality assessment) and then see its
>metadata (name, description,  creator user, etc)
>2. During a Flink job: when 2 datasource gets joined and we have
>multiple values for an attribute (e.g. name or lastname) we can access the
>datasource metadata to decide which value to retain (e.g. the one coming
>from the most authoritative/certified source for that attribute)
>
> We also think that this could be of interest for projects like Apache
> Zeppelin or Nifi enabling them to suggest to the user the sources to start
> from.
>
> Do you think it makes sense to think about designing such a module for
> Flink?
>
> Best,
> Flavio
>


Re: Any way for Flink Elasticsearch connector reflecting IP change of Elasticsearch cluster?

2016-05-04 Thread Fabian Hueske
I'm not so much familiar with the Kafka connector.
Can you post your suggestion to the user or dev mailing list?

Thanks, Fabian

2016-05-04 16:53 GMT+02:00 Sendoh :

> Glad to see it's developing.
> Can I ask would the same feature (reconnect) be useful for Kafka connector
> ?
> For example, if the IP of broker changes.
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Any-way-for-Flink-Elasticsearch-connector-reflecting-IP-change-of-Elasticsearch-cluster-tp6597p6688.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: Any way for Flink Elasticsearch connector reflecting IP change of Elasticsearch cluster?

2016-05-04 Thread Fabian Hueske
Sorry, I confused the mail threads. We're already on the user list :-)
Thanks for the suggestion.

2016-05-04 17:35 GMT+02:00 Fabian Hueske :

> I'm not so much familiar with the Kafka connector.
> Can you post your suggestion to the user or dev mailing list?
>
> Thanks, Fabian
>
> 2016-05-04 16:53 GMT+02:00 Sendoh :
>
>> Glad to see it's developing.
>> Can I ask would the same feature (reconnect) be useful for Kafka
>> connector ?
>> For example, if the IP of broker changes.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Any-way-for-Flink-Elasticsearch-connector-reflecting-IP-change-of-Elasticsearch-cluster-tp6597p6688.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>
>


Re: Prevent job/operator from spilling to disk

2016-05-04 Thread Fabian Hueske
Hi Max,

it is not possible to deactivate spilling to disk at the moment.
It might be possible to implement, but this would require a few more
changes to make it feasible.
For instance, we would need to add more fine-grained control about how
memory is distributed among operators.
This is currently statically decided by the optimizer and there is no
parameter to influence it.

Cheers, Fabian

2016-05-04 13:46 GMT+02:00 Maximilian Bode :

> Hi everyone,
>
> is there a way to prevent operators from spilling to disk? If not, would
> it be conceivable to make this configurable either per job or operator?
>
> The use case is a batch application with the formal requirement not to
> persist in-flight data to disk (even temporarily) so it would be preferable
> to see the job fail and then be able to grant sufficient memory and run it
> again.
>
> Cheers,
> Max
>
> —
> Maximilian Bode * Software Consultant * maximilian.b...@tngtech.com
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>
>


Re: Discussion about a Flink DataSource repository

2016-05-06 Thread Fabian Hueske
Hi Flavio,

I'll open a JIRA for de/serializing TableSource to textual JSON.

Would something like this work for you?

main() {
  ExecutionEnvironment env = ...
  TableEnvironment tEnv = ...

  // accessing an external catalog
  YourTableSource ts = Catalog.getTableSource("someIdentifier");
  tEnv.registerTableSource("someId", ts);

  // preparing meta data
  MetaData meta = ts.getMetaData()
  DataSet metaDS = env.fromElements(meta);

  // read data, table transformations + conversion to DataSet
  Table t = tEnv.scan("someId"); // apply some Table transformations if
necessary
  DataSet> ds = tEnv.toDataSet(t, TupleX);

  // apply custom functions on data set
  ds.map(MetaMapFunctionWrapper(new
MetaMapFunction())).withBroadcastSet(metaDS, "meta");

  // continue program

}

The YourMapFunctionWrapper could be a RichMapFunction that accesses the
meta data from the broadcasted set and provides it to a wrapped
MetaMapFunction (an extended MapFunction with custom interface for meta
data).

Depending on what kind of interface you plan to offer, you can hide most of
the complexity, e.g, users would only have to implement a MetaMapFunction
not have to deal with the broadcasting and accessing of meta data (this
would be done by your wrapper).

Fabian



2016-05-05 10:08 GMT+02:00 Flavio Pompermaier :

> HI Fabian,
> thanks for your detailed answer, as usual ;)
>
> I think that an external service it's ok,actually I wasn't aware of the
> TableSource interface.
> As you said, an utility to serialize and deserialize them would be very
> helpful and will ease this thing.
> However, registering metadata for a table is a very common task to do.
> Wouldn't be of useful for other Flink-related projects (I was thinking to
> Nifi for example) to define a common minimal set of (optional) metadata to
> display in a UI for a TableSource (like name, description, creationDate,
> creator, field aliases)?
>
> About point 2, I think that dataset broadcasting or closure variables are
> useful when you write a program, not if you try to "compose" it using
> reusable UDFs (using a script like in Pig).
> Of course, the worst case scenario for us (e.g. right now) is to connect
> to our repository within rich operators but I thought that it could be easy
> to define a link from operators to TableEnvironment and then to TableSource
> (using the lineage tag/source-id you said) and, finally to its metadata. I
> don't know whether this is specific only to us, I just wanted to share our
> needs and see if the table API development could benefit from them.
>
> Best,
> Flavio
>
> On Wed, May 4, 2016 at 10:35 AM, Fabian Hueske  wrote:
>
>> Hi Flavio,
>>
>> I thought a bit about your proposal. I am not sure if it is actually
>> necessary to integrate a central source repository into Flink. It should be
>> possible to offer this as an external service which is based on the
>> recently added TableSource interface. TableSources could be extended to be
>> able to serialize and descerialize their configuration to/from JSON. When
>> the external repository service starts, it can read the JSON fields and
>> instantiate and register TableSource objectes. The repository could also
>> hold metadata about the sources and serve a (web) UI to list available
>> source. When a Flink program wants to access a data source which is
>> registered in the repository, it could lookup the respective TableSouce
>> object from the repository.
>>
>> Given that an integration of metadata with Flink user functions (point 2.
>> in your proposal) is a very special requirement, I am not sure how much
>> "native" support should be added to Flink. Would it be possible to add a
>> lineage tag to each record and ship the metadata of all sources as
>> broadcast set to each operator? Then user functions could lookup the
>> metadata from the broadcast set.
>>
>> Best, Fabian
>>
>> 2016-04-29 12:49 GMT+02:00 Flavio Pompermaier :
>>
>>> Hi to all,
>>>
>>> as discussed briefly with Fabian, for our products in Okkam we need a
>>> central repository of DataSources processed by Flink.
>>> With respect to existing external catalogs, such as Hive or Confluent's
>>> SchemaRegistry, whose objective is to provide necessary metadata to
>>> read/write the registered tables, we would also need a way to acess to
>>> other general metadata (e.g. name, description, creator, creation date,
>>> lastUpdate date, processedRecords, certificationLevel of provided data,
>>> provenance, language, etc).
>>>
>>> This integration has 2 main goals:
>>>
>>>1. In

Re: OutputFormat in streaming job

2016-05-06 Thread Fabian Hueske
Hi Andrea,

you can use any OutputFormat to emit data from a DataStream using the
writeUsingOutputFormat() method.
However, this method does not guarantee exactly-once processing. In case of
a failure, it might emit some records a second time. Hence the results will
be written at least once.

Hope this helps,
Fabian

2016-05-06 12:45 GMT+02:00 Andrea Sella :

> Hi,
>
> I created a custom OutputFormat to send data to a remote actor, there are
> issues to use an OutputFormat into a stream job? Or it will treat like a
> Sink?
>
> I prefer to use it in order to create a custom ActorSystem per TM in the
> configure method.
>
> Cheers,
> Andrea
>


Re: Where to put live model and business logic in Hadoop/Flink BigData system

2016-05-06 Thread Fabian Hueske
Hi Palle,

this sounds indeed like a good use case for Flink.

Depending on the complexity of the aggregated historical views, you can
implement a Flink DataStream program which builds the views on the fly,
i.e., you do not need to periodically trigger MR/Flink/Spark batch jobs to
compute the views. Instead, you can use the concept of windows to group
data by time (and other attributes) and compute the aggregates (depends on
the type of aggregates) on-the-fly while data is arriving.

The live model can also be computed by Flink. You can access the historic
data from an external store (HBase / Mongo) also cache parts of it in the
Flink job to achieve lower latency. It is also possible to store the live
model in your Flink job and query it from there (see this blogpost [1],
section "Winning Twitter Hack Week: Eliminating the key-value store
bottleneck"). Flink will partition the data, so it should be able to handle
the data sizes you mentioned.

Best, Fabian

[1] http://data-artisans.com/extending-the-yahoo-streaming-benchmark/

2016-05-06 13:40 GMT+02:00 Deepak Sharma :

> I see the flow to be as below:
> LogStash->Log Stream->Flink ->Kafka->Live Model
>|
> Mongo/HBASE
>
> The Live Model will again be Flink streaming data sets from Kakfa.
> There you analyze the incoming stream for the certain value and once you
> find this certain value , read the historical view and then do the analysis
> in Flink itself.
> For your java objects , i guess you can use checkpointed interface (have
> not used it though yet)
>
> Thanks
> Deepak
>
>
> On Fri, May 6, 2016 at 4:22 PM,  wrote:
>
>> Hi there.
>>
>> We are putting together some BigData components for handling a large
>> amount of incoming data from different log files and perform some analysis
>> on the data.
>>
>> All data being fed into the system will go into HDFS. We plan on using
>> Logstash, Kafka and Flink for bringing data from the log files and into
>> HDFS. All our data located in HDFS we will designate as our historic data
>> and we will use MapReduce (probably Flink, but could also be Hadoop) to
>> create some aggregate views of the historic data. These views we will
>> locate probably in HBase or MongoDB.
>>
>> These views of the historic data (also called batch views in the Lambda
>> Architecture if any of you are familiar with that) we will use from the
>> live model in the system. The live model is also being fed with the same
>> data (through Kafka) and when the live model detects a certain value in the
>> incoming data, it will perform some analysis using the views in
>> HBase/MongoDB of the historic data.
>>
>> Now, could anyone share some knowledge regarding where it would be
>> possible to implement such a live model given the components we plan on
>> using? Apart from the business logic that will perform the analysis, our
>> live model will at all times also contain a java object structure of maybe
>> 5-10 java collections (maps, lists) containing approx 5 mio objects.
>>
>> So, where is it possible to implement our live model? Can we do this in
>> Flink? Can we do this with another component within the Hadoop Big Data
>> ecosystem?
>>
>> Thanks.
>>
>> /Palle
>>
>
>
>
> --
> Thanks
> Deepak
> www.bigdatabig.com
> www.keosha.net
>


Re: OutputFormat in streaming job

2016-05-06 Thread Fabian Hueske
Hi Andrea,

actually, OutputFormat.configure() will also be invoked per task. So you
would also end up with 16 ActorSystems.
However, I think you can use synchronized singleton object to start one
ActorSystem per TM (each TM and all tasks run in a single JVM).

So from the point of view of configure(), I think it does not make a
difference whether to use an OutputFormat or a RichSinkFunction.
I would rather go for the SinkFunction, which is better suited for
streaming jobs.

Cheers, Fabian

2016-05-06 14:10 GMT+02:00 Andrea Sella :

> Hi Fabian,
>
> ATM I am not interesting to guarantee exactly-once processing, thank you
> for the clarification.
>
> As far as I know, it is not present a similar method as OutputFormat's
> configure for RichSinkFunction, correct? So I am not able to instantiate an
> ActorSystem per TM but I have to instantiate an ActorSystem per TaskSlot,
> which it is unsuitable because ActorSystem is very heavy.
>
> Example:
> Outputformat => 2 TM => 16 parallelism => 2 ActorSystem (instantiate into
> OutputFormat's configure method)
> Sink => 2 TM => 16 parallelism => 16 Actor System  (instantiate into
> RichSinkFunction's open method)
>
> Am I wrong?
>
> Thanks again,
> Andrea
>
> 2016-05-06 13:47 GMT+02:00 Fabian Hueske :
>
>> Hi Andrea,
>>
>> you can use any OutputFormat to emit data from a DataStream using the
>> writeUsingOutputFormat() method.
>> However, this method does not guarantee exactly-once processing. In case
>> of a failure, it might emit some records a second time. Hence the results
>> will be written at least once.
>>
>> Hope this helps,
>> Fabian
>>
>> 2016-05-06 12:45 GMT+02:00 Andrea Sella :
>>
>>> Hi,
>>>
>>> I created a custom OutputFormat to send data to a remote actor, there
>>> are issues to use an OutputFormat into a stream job? Or it will treat like
>>> a Sink?
>>>
>>> I prefer to use it in order to create a custom ActorSystem per TM in the
>>> configure method.
>>>
>>> Cheers,
>>> Andrea
>>>
>>
>>
>


Re: Discussion about a Flink DataSource repository

2016-05-06 Thread Fabian Hueske
Yes, you can transform the broadcast set when it is accessed with
RuntimeContext.getBroadcastVariableWithInitializer() and a
BroadcastVariableInitializer.

2016-05-06 14:07 GMT+02:00 Flavio Pompermaier :

> That was more or less what I was thinking. The only thing I'm not sure is
> the usage of the broadcasted dataset, since I'd need to access tot the
> MetaData dataset by sourceId (so I'd need an Map.
> Probably I'd do:
>
> Map meta = ...;//preparing metadata lookUp table
> ...
> ds.map(MetaMapFunctionWrapper(new MetaMapFunction(meta)))
>
> What do you think? Is there the possibility to open a broadcasted Dataset
> as a Map instead of a List?
>
> Best,
> Flavio
>
>
> On Fri, May 6, 2016 at 12:06 PM, Fabian Hueske  wrote:
>
>> Hi Flavio,
>>
>> I'll open a JIRA for de/serializing TableSource to textual JSON.
>>
>> Would something like this work for you?
>>
>> main() {
>>   ExecutionEnvironment env = ...
>>   TableEnvironment tEnv = ...
>>
>>   // accessing an external catalog
>>   YourTableSource ts = Catalog.getTableSource("someIdentifier");
>>   tEnv.registerTableSource("someId", ts);
>>
>>   // preparing meta data
>>   MetaData meta = ts.getMetaData()
>>   DataSet metaDS = env.fromElements(meta);
>>
>>   // read data, table transformations + conversion to DataSet
>>   Table t = tEnv.scan("someId"); // apply some Table transformations if
>> necessary
>>   DataSet> ds = tEnv.toDataSet(t, TupleX);
>>
>>   // apply custom functions on data set
>>   ds.map(MetaMapFunctionWrapper(new
>> MetaMapFunction())).withBroadcastSet(metaDS, "meta");
>>
>>   // continue program
>>
>> }
>>
>> The YourMapFunctionWrapper could be a RichMapFunction that accesses the
>> meta data from the broadcasted set and provides it to a wrapped
>> MetaMapFunction (an extended MapFunction with custom interface for meta
>> data).
>>
>> Depending on what kind of interface you plan to offer, you can hide most
>> of the complexity, e.g, users would only have to implement a
>> MetaMapFunction not have to deal with the broadcasting and accessing of
>> meta data (this would be done by your wrapper).
>>
>> Fabian
>>
>>
>>
>> 2016-05-05 10:08 GMT+02:00 Flavio Pompermaier :
>>
>>> HI Fabian,
>>> thanks for your detailed answer, as usual ;)
>>>
>>> I think that an external service it's ok,actually I wasn't aware of the
>>> TableSource interface.
>>> As you said, an utility to serialize and deserialize them would be very
>>> helpful and will ease this thing.
>>> However, registering metadata for a table is a very common task to do.
>>> Wouldn't be of useful for other Flink-related projects (I was thinking to
>>> Nifi for example) to define a common minimal set of (optional) metadata to
>>> display in a UI for a TableSource (like name, description, creationDate,
>>> creator, field aliases)?
>>>
>>> About point 2, I think that dataset broadcasting or closure variables
>>> are useful when you write a program, not if you try to "compose" it using
>>> reusable UDFs (using a script like in Pig).
>>> Of course, the worst case scenario for us (e.g. right now) is to connect
>>> to our repository within rich operators but I thought that it could be easy
>>> to define a link from operators to TableEnvironment and then to TableSource
>>> (using the lineage tag/source-id you said) and, finally to its metadata. I
>>> don't know whether this is specific only to us, I just wanted to share our
>>> needs and see if the table API development could benefit from them.
>>>
>>> Best,
>>> Flavio
>>>
>>> On Wed, May 4, 2016 at 10:35 AM, Fabian Hueske 
>>> wrote:
>>>
>>>> Hi Flavio,
>>>>
>>>> I thought a bit about your proposal. I am not sure if it is actually
>>>> necessary to integrate a central source repository into Flink. It should be
>>>> possible to offer this as an external service which is based on the
>>>> recently added TableSource interface. TableSources could be extended to be
>>>> able to serialize and descerialize their configuration to/from JSON. When
>>>> the external repository service starts, it can read the JSON fields and
>>>> instantiate and register TableSource objectes. The repository could also
>>>> hold metadata about the sources and serve a (w

Re: How to make Flink read all files in HDFS folder and do transformations on th e data

2016-05-07 Thread Fabian Hueske
Hi Palle,

you can recursively read all files in a folder as explained in the
"Recursive Traversal of the Input Path Directory" section of the Data
Source documentation [1].

The easiest way to read line-wise JSON objects is to use
ExecutionEnvironment.readTextFile() which reads text files linewise as
strings and a subsequent mapper that uses a JSON parser (e.g., Jackson) to
parse the JSON strings. You should use a RichMapFunction and create the
parser in the open() method to avoid instantiating a new parser for each
incoming line. After parsing, the RichMapFunction can emit POJOs.

Cheers, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/index.html#data-sources

2016-05-07 12:25 GMT+02:00 Flavio Pompermaier :

> I had the same issue :)
> I resolved it reading all file paths in a collection, then using this code:
>
> env.fromCollection(filePaths).rebalance().map(file2pojo)
>
> You can have your dataset of Pojos!
>
> The rebalance() is necessary to exploit parallelism,otherwise the pipeline
> will be executed with parallelism 1.
>
> Best,
> Flavio
> On 7 May 2016 12:13, "Palle"  wrote:
>
> Hi there.
>
> I've got a HDFS folder containing a lot of files. All files contains a lot
> of JSON objects, one for each line. I will have several TB in the HDFS
> folder.
>
> My plan is to make Flink read all files and all JSON objects and then do
> some analysis on the data, actually very similar to the
> flatMap/groupBy/reduceGroup transformations that is done in the WordCount
> example.
>
> But I am a bit stuck, because I cannot seem to find out how to make Flink
> read all files in a HDFS dir and then perform the transformations on the
> data. I have googled quite a bit and also looked in the Flink API and mail
> history.
>
> Can anyone point me to an example where Flink is used to read all files in
> a HDFS folder and then do transformations on the data)?
>
> - and a second question: Is there an elegant way to make Flink handle the
> JSON objects? - can they be converted to POJOs by something similar to the
> pojoType() method?
>
> /Palle
>
>


Re: Creating a custom operator

2016-05-09 Thread Fabian Hueske
Hi Simone,

sorry for the delayed answer. I have a few questions regarding your
requirements and a some ideas that might be helpful (depending on the
requirements).

1) Starting / stopping of H2O nodes from Flink
- You wrote you'd like to "instantiate a H2O's node in every task manager".
This reads a bit like you want to start H2O in the TM's JVM , but I would
assume that a H2O node runs as a separate process. So should it be started
inside the TM JVM or as an external process next to each TM. Also, do you
want to start one H2O node per TM slot or per TM?
- You wrote you'd like to "handle the lifecycle of the node according to
the taskmanager and the execution graph". A TM can execute multiple jobs
each with its own execution graph. Do you want to start the H2O node for
each job and shut it down when the job finishes or start the H2O when the
TM is started and kill it when the TM is brought down?
- "keep the H2O's nodes alive through multiple tasks" The first option
(starting for each job) would allow to share the H2O node for all tasks of
a job. This could be done using two MapPartition operators, the first
Mapper is put in front of the first task requiring H2O starting an H2O
service before the first record is forwarded and the second task is put
after the last H2O task and shuts it down after the last element was
forwarded. The mappers itself do nothing than forwarding elements and
starting and stopping tasks. If you would like to share H2O nodes across
jobs, we might need another hook to start the process.
- "move the data locally from Flink to H2O", do you mean host local or JVM
local? I think it should not be hard to keep the data host local.

2) "Allow the developer to code everything inside Flink".
- The Table API which you are referring to in your example is built on top
of the DataSet and DataStream APIs. I think it should be possible to add
another API similar to the Table API. You should be aware that the Table
API is currently quite actively developed and should not be considered to
be a stable interface. So certain things might change in the next versions.
With 1.0 we stabilized the DataSet API and I would rather put a new API on
top of it than on the Table API.
- Regarding the transformation in H2O structures and calling H2O
operations, I think this might again be done in MapPartition operators. In
general, MapPartition gives you a lot of freedom because it provides an
iterator over all elements of a partition. So you can do things before the
first and after the last element and group data as you like. You can use
partitionByHash() or rebalace() to shuffle data and sortPartition to
locally sort the data in a partition. Please note that MapPartition
operators do not support chaining and come therefore with a certain
serialization overhead. Whenever possible you should use a MapFunction or
FlatMapFunction which are a bit more lightweight.

Hope this helps,
Fabian


2016-05-03 15:13 GMT+02:00 Simone Robutti :

> I'm not sure this is the right way to do it but we were exploring all the
> possibilities and this one is the more obvious. We also spent some time to
> study how to do it to achieve a better understanding of Flink's internals.
>
> What we want to do though is to integrate Flink with another distributed
> system that builds its own nodes and coordinates through the network with
> its own logic. This software is H2O (a Machine Learning platform) and the
> integration consists of two big tasks: the first is to instantiate a H2O's
> node in every task manager and handle the lifecycle of the node according
> to the taskmanager and the execution graph. The second is to allow the
> developer to code everything inside Flink, converting from and to H2O's
> data structures (distributed tabular data) and triggering the execution of
> algorithms on H2O with a uniform API.
>
> Here's a simple example (assuming that we will use the TableAPI):
>
> val env = ExecutionEnvironment.getExecutionEnvironment
> val h2oEnv = H2OEnviroment.getEnvironment(env)
>
> val myData: Table = ...
> val someOtherData: Table = ...
>
> val myH2OFrame = myData.select(...).toH2OFrame(h2oEnv)
>
> val linearRegressionModel = h2oEnv.linearRegression(myH2OFrame)
>
> val predictions:Table=linearRegressionModel(someOtherData)
>
> predictions.select(...)
>
>
> A good solution should allow the system to keep the H2O's nodes alive
> through multiple tasks and the possibility to move the data locally from
> Flink to H2O. The latter is not achieved in H2O's integration with Spark
> but we still hope to do it.
>
> That said, I'm still not sure if it is really required to implement a
> custom runtime operator but given the complexity of the integration of two
> distribute systems, we assumed that more cont

Re: Force triggering events on watermark

2016-05-10 Thread Fabian Hueske
Maybe the last example of this blog post is helpful [1].

Best, Fabian

[1]
https://www.mapr.com/blog/essential-guide-streaming-first-processing-apache-flink

2016-05-10 17:24 GMT+02:00 Srikanth :

> Hi,
>
> I read the following in Flink doc "We can explicitly specify a Trigger to
> overwrite the default Trigger provided by the WindowAssigner. Note that
> specifying a triggers does not add an additional trigger condition but
> replaces the current trigger."
> So, I tested out the below code with count trigger. As per my
> understanding this will override the default watermark based trigger.
>
> val testStream = env.fromCollection(List( ("2016-04-07 13:11:59", 157428,
> 4),
>  ("2016-04-07 13:11:59", 157428, 4),
>  ("2016-04-07 13:11:59", 111283, 23),
>  ("2016-04-07 13:11:57", 108042, 23),
>  ("2016-04-07 13:12:00", 161374, 9),
>  ("2016-04-07 13:12:00", 161374, 9),
>  ("2016-04-07 13:11:59", 136505, 4)
> )
> )
>.assignAscendingTimestamps(b => f.parse(b._1).getTime())
>.map(b => (b._3, b._2))
>
> testStream.print
>
> val countStream = testStream
> .keyBy(_._1)
> .timeWindow(Time.seconds(20))
> .trigger(CountTrigger.of(3))
> .fold((0, List[Int]())) { case((k,r),i) => (i._1, r ++ List(i._2)) }
>
> countStream.print
>
> Output I saw confirms the documented behavior. Processing is triggered
> only when we have 3 elements for a key.
> How do I force trigger the left over records when watermark is past the
> window? I.e, I want to use triggers to start early processing but finalize
> the window based on watermark.
>
> Output shows that records for keys 23 & 9 weren't processed.
>   (4,157428)
>   (4,157428)
>   (23,111283)
>   (23,108042)
>   (9,161374)
>   (9,161374)
>   (4,136505)
>
>   (4,List(157428, 157428, 136505))
>
> Thanks,
> Srikanth
>


Re: Creating a custom operator

2016-05-11 Thread Fabian Hueske
2016-05-09 14:56 GMT+02:00 Simone Robutti :

> >- You wrote you'd like to "instantiate a H2O's node in every task
> manager". This reads a bit like you want to start H2O in the TM's JVM , but
> I would assume that a H2O node runs as a separate process. So should it be
> started inside the TM JVM or as an external process next to each TM. Also,
> do you want to start one H2O node per TM slot or per TM?
>
> My idea is to run it in the same process but there may be several good
> reasons not to do it, it's just the way I think of it right now. I'm
> thinking about replicating the structure of Sparkling Water and for my
> understanding, they run their H2O nodes in the same process.
>
> That should be possible by starting a thread from a MapPartition operator.
To make it one H2O node per TM, you would need a synchronized singleton to
avoid that each parallel task starts a new thread.


>
> >- You wrote you'd like to "handle the lifecycle of the node according to
> the taskmanager and the execution graph". A TM can execute multiple jobs
> each with its own execution graph. Do you want to start the H2O node for
> each job and shut it down when the job finishes or start the H2O when the
> TM is started and kill it when the TM is brought down?
>
> There are different trade-offs for both choices. I assume that there's
> nothing inside H2O that should be shared between different jobs for most
> use cases so it should follow the job's lifecycle. In the previous mail
> this was ambigous, my bad.
>

The approach with two MapPartition operators at the beginning and end of
the H2O section might work then.


> >- "move the data locally from Flink to H2O", do you mean host local or
> JVM local? I think it should not be hard to keep the data host local.
>
> JVM local. This is clearly not an issue flink-side but may be an issue on
> H2O's side. It's one of the many issues we will tackle as soon as we will
> talk with them (I hope soon).
>
> >- The Table API which you are referring to in your example is built on
> top of the DataSet and DataStream APIs. I think it should be possible to
> add another API similar to the Table API. You should be aware that the
> Table API is currently quite actively developed and should not be
> considered to be a stable interface. So certain things might change in the
> next versions. With 1.0 we stabilized the DataSet API and I would rather
> put a new API on top of it than on the Table API.
>
> We know but we will work mostly with the data abstractions of the Table
> API and not the operations. We take the risk to rework it if they change in
> the meantime.
>
> Your reply really helped: many questions helped us clear our mind on a few
> points. H2O's team showed interest in working on this integration or at
> least support us in the development. We are waiting for them to start a
> discussion and as soon as we will have a more clear idea on how to proceed,
> we will validate it with the stuff you just said. Your confidence in
> Flink's operators gives up hope to achieve a clean solution.
>
> Thanks a lot of your time,
>
Simone
>
> Sure :-)

Cheers, Fabian


> 2016-05-09 12:24 GMT+02:00 Fabian Hueske :
>
>> Hi Simone,
>>
>> sorry for the delayed answer. I have a few questions regarding your
>> requirements and a some ideas that might be helpful (depending on the
>> requirements).
>>
>> 1) Starting / stopping of H2O nodes from Flink
>> - You wrote you'd like to "instantiate a H2O's node in every task
>> manager". This reads a bit like you want to start H2O in the TM's JVM , but
>> I would assume that a H2O node runs as a separate process. So should it be
>> started inside the TM JVM or as an external process next to each TM. Also,
>> do you want to start one H2O node per TM slot or per TM?
>> - You wrote you'd like to "handle the lifecycle of the node according to
>> the taskmanager and the execution graph". A TM can execute multiple jobs
>> each with its own execution graph. Do you want to start the H2O node for
>> each job and shut it down when the job finishes or start the H2O when the
>> TM is started and kill it when the TM is brought down?
>> - "keep the H2O's nodes alive through multiple tasks" The first option
>> (starting for each job) would allow to share the H2O node for all tasks of
>> a job. This could be done using two MapPartition operators, the first
>> Mapper is put in front of the first task requiring H2O starting an H2O
>> service before the first record is forwarded and the second task is put
>> after the last H2O ta

Re: get start and end time stamp from time window

2016-05-12 Thread Fabian Hueske
Hi Martin,

You can use a FoldFunction and a WindowFunction to process the same!
window. The FoldFunction is eagerly applied, so the window state is only
one element. When the window is closed, the aggregated element is given to
the WindowFunction where you can add start and end time. The iterator of
the WindowFunction will provide only one (the aggregated) element.

See the apply method on WindowedStream with the following signature:
apply(initialValue: R, foldFunction: FoldFunction[T, R], function:
WindowFunction[R, R, K, W]): DataStream[R]

Cheers, Fabian

2016-05-11 20:16 GMT+02:00 Martin Neumann :

> Hej,
>
> I have a windowed stream and I want to run a (generic) fold function on
> it. The result should have the start and the end time stamp of the window
> as fields (so I can relate it to the original data). *Is there a simple
> way to get the timestamps from within the fold function?*
>
> I could find the lowest and the highest ts as part of the fold function
> but that would not be very accurate especially when I the number of events
> in the window is low. Also, I want to write in a generic way so I can use
> it even if the data itself does not contain a time stamp field (running on
> processing time).
>
> I have looked into using a WindowFunction where I would have access to the
> start and end timestamp. I have not quite figured out how I would implement
> a fold function using this. Also, from my understanding this approach would
> require holding the whole window in memory which is not a good option since
> the window data can get very large.
>
> Is there a better way of doing this
>
>
> cheers Martin
>


Re: Flink + Avro GenericRecord - first field value overwrites all other fields

2016-05-12 Thread Fabian Hueske
Hi Tarandeep,

the AvroInputFormat was recently extended to support GenericRecords. [1]
You could also try to run the latest SNAPSHOT version and see if it works
for you.

Cheers, Fabian

[1] https://issues.apache.org/jira/browse/FLINK-3691

2016-05-12 10:05 GMT+02:00 Tarandeep Singh :

> I think I found a workaround. Instead of reading Avro files as
> GenericRecords, if I read them as specific records and then use a map to
> convert (typecast) them as GenericRecord, the problem goes away.
>
> I ran some tests and so far this workaround seems to be working in my
> local setup.
>
> -Tarandeep
>
> On Wed, May 11, 2016 at 10:24 PM, Tarandeep Singh 
> wrote:
>
>> Hi,
>>
>> I am using DataSet API and reading Avro files as DataSet.
>> I am seeing this weird behavior that record is read correctly from file
>> (verified by printing all values) but when when this record is passed to
>> Flink chain/DAG (e.g. KeySelector), every field in this record has the same
>> value as the first field value. Even more weird is they values are of
>> different types, e.g. I have a record Query with two fields key (integer)
>> and query (String). When the record was read from file, correct values were
>> read (e.g. 100, "apache flink"). But when I print/check values in
>> KeySelector, I get (100, 100).
>>
>> I saw similar post on stackoverflow-
>>
>> http://stackoverflow.com/questions/37115618/apache-flink-union-operator-giving-wrong-response
>>
>> Any idea what might be happening?
>> Any workaround will be greatly appreciated.
>>
>> Thank you,
>> Tarandeep
>>
>>
>


Re: Flink recovery

2016-05-13 Thread Fabian Hueske
Hi,

Flink's exactly-once semantics do not mean that events are processed
exactly-once but that events will contribute exactly-once to the state of
an operator such as a counter.
Roughly, the mechanism works as follows:
- Flink peridically injects checkpoint markers into the data stream. This
happens synchronously across all sources and markers.
- When an operator receives a checkpoint marker from all its sources, it
checkpoints its state and forwards the marker
- When the marker was received by all sinks, the distributed checkpoint is
noted as successful.

In case of a failure, the state of all operators is reset to the last
successful checkpoint and the sources are reset to the point when the
marker was injected.
Hence, some events are sent a second time to the operators but the state of
the operators was reset as well. So the repeated events contribute exactly
once to the state of an operator.

Note, you need a SinkFunction that supports Flink's checkpointing mechanism
to achieve exactly-once output. Otherwise, it might happen that results are
emitted multiple times.

Cheers, Fabian

2016-05-13 22:58 GMT+02:00 Madhire, Naveen :

> I checked the JIRA and looks like FLINK-2111 should address the issue
> which I am facing. I am canceling the job from dashboard.
>
> I am using kafka source and HDFS rolling sink.
>
> https://issues.apache.org/jira/browse/FLINK-2111
>
> Is this JIRA part of Flink 1.0.0?
>
>
>
> Thanks,
> Naveen
>
> From: "Madhire, Venkat Naveen Kumar Reddy" 
> Reply-To: "user@flink.apache.org" 
> Date: Friday, May 13, 2016 at 10:58 AM
> To: "user@flink.apache.org" 
> Subject: Flink recovery
>
> Hi,
>
> We are trying to test the recovery mechanism of Flink with Kafka and HDFS
> sink during failures.
>
> I’ve killed the job after processing some messages and restarted the same
> job again. Some of the messages I am seeing are processed more than once
> and not following the exactly once semantics.
>
>
> Also, using the checkpointing mechanism and saving the state checkpoints
> into HDFS.
> Below is the checkpoint code,
>
> envStream.enableCheckpointing(11);
> envStream.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> envStream.getCheckpointConfig().setCheckpointTimeout(6);
> envStream.getCheckpointConfig().setMaxConcurrentCheckpoints(4);
>
> envStream.setStateBackend(new 
> FsStateBackend("hdfs://ipaddr/mount/cp/checkpoint/"));
>
>
> One thing I’ve noticed is lowering the time to checkpointing is actually
> lowering the number of messages processed more than once and 11ms is the
> lowest I can use.
>
> Is there anything else I should try to have exactly once message
> processing functionality.
>
> I am using Flink 1.0.0 and kafka 0.8
>
>
> Thank you.
>
> --
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>
> --
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>


Re: Flink recovery

2016-05-13 Thread Fabian Hueske
Hi Naveen,

the RollingFileSink supports exactly-once output. So you should be good.

Did you see events being emitted multiple times (should not happen with the
RollingFileSink) or being processed multiple times within the Flink program
(might happen as explained before)?

Best, Fabian

2016-05-13 23:19 GMT+02:00 Madhire, Naveen :

> Thank you Fabian.
>
> I am using HDFS rolling sink. This should support the exactly once output
> in case of failures, isn’t it? I am following the below documentation,
>
>
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/fault_tolerance.html#fault-tolerance-guarantees-of-data-sources-and-sinks
>
> If not what other Sinks can I use to have the exactly once output since
> getting exactly once output is critical for our use case.
>
>
>
> Thanks,
> Naveen
>
> From: Fabian Hueske 
> Reply-To: "user@flink.apache.org" 
> Date: Friday, May 13, 2016 at 4:13 PM
> To: "user@flink.apache.org" 
> Subject: Re: Flink recovery
>
> Hi,
>
> Flink's exactly-once semantics do not mean that events are processed
> exactly-once but that events will contribute exactly-once to the state of
> an operator such as a counter.
> Roughly, the mechanism works as follows:
> - Flink peridically injects checkpoint markers into the data stream. This
> happens synchronously across all sources and markers.
> - When an operator receives a checkpoint marker from all its sources, it
> checkpoints its state and forwards the marker
> - When the marker was received by all sinks, the distributed checkpoint is
> noted as successful.
>
> In case of a failure, the state of all operators is reset to the last
> successful checkpoint and the sources are reset to the point when the
> marker was injected.
> Hence, some events are sent a second time to the operators but the state
> of the operators was reset as well. So the repeated events contribute
> exactly once to the state of an operator.
>
> Note, you need a SinkFunction that supports Flink's checkpointing
> mechanism to achieve exactly-once output. Otherwise, it might happen that
> results are emitted multiple times.
>
> Cheers, Fabian
>
> 2016-05-13 22:58 GMT+02:00 Madhire, Naveen 
> :
>
>> I checked the JIRA and looks like FLINK-2111 should address the issue
>> which I am facing. I am canceling the job from dashboard.
>>
>> I am using kafka source and HDFS rolling sink.
>>
>> https://issues.apache.org/jira/browse/FLINK-2111
>>
>> Is this JIRA part of Flink 1.0.0?
>>
>>
>>
>> Thanks,
>> Naveen
>>
>> From: "Madhire, Venkat Naveen Kumar Reddy" > >
>> Reply-To: "user@flink.apache.org" 
>> Date: Friday, May 13, 2016 at 10:58 AM
>> To: "user@flink.apache.org" 
>> Subject: Flink recovery
>>
>> Hi,
>>
>> We are trying to test the recovery mechanism of Flink with Kafka and HDFS
>> sink during failures.
>>
>> I’ve killed the job after processing some messages and restarted the same
>> job again. Some of the messages I am seeing are processed more than once
>> and not following the exactly once semantics.
>>
>>
>> Also, using the checkpointing mechanism and saving the state checkpoints
>> into HDFS.
>> Below is the checkpoint code,
>>
>> envStream.enableCheckpointing(11);
>> envStream.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>> envStream.getCheckpointConfig().setCheckpointTimeout(6);
>> envStream.getCheckpointConfig().setMaxConcurrentCheckpoints(4);
>>
>> envStream.setStateBackend(new 
>> FsStateBackend("hdfs://ipaddr/mount/cp/checkpoint/"));
>>
>>
>> One thing I’ve noticed is lowering the time to checkpointing is actually
>> lowering the number of messages processed more than once and 11ms is the
>> lowest I can use.
>>
>> Is there anything else I should try to have exactly once message
>> processing functionality.
>>
>> I am using Flink 1.0.0 and kafka 0.8
>>
>>
>> Thank you.
>>
>> --
>>
>> The information contained in this e-mail is confidential and/or
>> proprietary to Capital One and/or its affiliates and may only be used
>> solely in performance of work or services for Capital One. The information
>> transmitted herewith is intended only for use by the individual or entity
>> to which it is addressed. If the reader of this message is not the intended
>> recipient, you are hereby notified that any review, retransmission,
>> dissemination, distribution, copying or other use of, or

Re: Flink recovery

2016-05-14 Thread Fabian Hueske
The behavior of the RollingFileSink depends on the capabilities of the file
system.
If the file system does not support to truncate files such as older HDFS
versions, an additional file with a .valid-length suffix is written to
indicate how much of the file is valid.
All records / data that come after the valid-length are duplicates.
Please refer to the JavaDocs of the RollingFileSink for details [1].

If the .valid-length file does not solve the problem, you might have found
a bug and we should have a closer look at the problem.

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/fs/RollingSink.html

2016-05-14 4:17 GMT+02:00 Madhire, Naveen :

> Thanks Fabian. Yes, I am seeing few records more than once in the output.
> I am running the job and canceling it from the dashboard, and running
> again. And using different HDFS file outputs both the times. I was thinking
> when I cancel the job, it’s not doing a clean cancel.
> Is there anything else which I have to use to make it exactly once in the
> output?
>
> I am using a simple read from kafka, transformations and rolling file sink
> pipeline.
>
>
>
> Thanks,
> Naveen
>
> From: Fabian Hueske 
> Reply-To: "user@flink.apache.org" 
> Date: Friday, May 13, 2016 at 4:26 PM
>
> To: "user@flink.apache.org" 
> Subject: Re: Flink recovery
>
> Hi Naveen,
>
> the RollingFileSink supports exactly-once output. So you should be good.
>
> Did you see events being emitted multiple times (should not happen with
> the RollingFileSink) or being processed multiple times within the Flink
> program (might happen as explained before)?
>
> Best, Fabian
>
> 2016-05-13 23:19 GMT+02:00 Madhire, Naveen 
> :
>
>> Thank you Fabian.
>>
>> I am using HDFS rolling sink. This should support the exactly once output
>> in case of failures, isn’t it? I am following the below documentation,
>>
>>
>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/fault_tolerance.html#fault-tolerance-guarantees-of-data-sources-and-sinks
>>
>> If not what other Sinks can I use to have the exactly once output since
>> getting exactly once output is critical for our use case.
>>
>>
>>
>> Thanks,
>> Naveen
>>
>> From: Fabian Hueske 
>> Reply-To: "user@flink.apache.org" 
>> Date: Friday, May 13, 2016 at 4:13 PM
>> To: "user@flink.apache.org" 
>> Subject: Re: Flink recovery
>>
>> Hi,
>>
>> Flink's exactly-once semantics do not mean that events are processed
>> exactly-once but that events will contribute exactly-once to the state of
>> an operator such as a counter.
>> Roughly, the mechanism works as follows:
>> - Flink peridically injects checkpoint markers into the data stream. This
>> happens synchronously across all sources and markers.
>> - When an operator receives a checkpoint marker from all its sources, it
>> checkpoints its state and forwards the marker
>> - When the marker was received by all sinks, the distributed checkpoint
>> is noted as successful.
>>
>> In case of a failure, the state of all operators is reset to the last
>> successful checkpoint and the sources are reset to the point when the
>> marker was injected.
>> Hence, some events are sent a second time to the operators but the state
>> of the operators was reset as well. So the repeated events contribute
>> exactly once to the state of an operator.
>>
>> Note, you need a SinkFunction that supports Flink's checkpointing
>> mechanism to achieve exactly-once output. Otherwise, it might happen that
>> results are emitted multiple times.
>>
>> Cheers, Fabian
>>
>> 2016-05-13 22:58 GMT+02:00 Madhire, Naveen > >:
>>
>>> I checked the JIRA and looks like FLINK-2111 should address the issue
>>> which I am facing. I am canceling the job from dashboard.
>>>
>>> I am using kafka source and HDFS rolling sink.
>>>
>>> https://issues.apache.org/jira/browse/FLINK-2111
>>>
>>> Is this JIRA part of Flink 1.0.0?
>>>
>>>
>>>
>>> Thanks,
>>> Naveen
>>>
>>> From: "Madhire, Venkat Naveen Kumar Reddy" <
>>> naveen.madh...@capitalone.com>
>>> Reply-To: "user@flink.apache.org" 
>>> Date: Friday, May 13, 2016 at 10:58 AM
>>> To: "user@flink.apache.org" 
>>> Subject: Flink recovery
>>>
>>> Hi,
>>>
>>> We are trying to test the recovery mechanism of Flink with Kafka and
>>

Re: Flink Kafka Streaming - Source [Bytes/records Received] and Sink [Bytes/records sent] show zero messages

2016-05-16 Thread Fabian Hueske
Hi Prateek,

the missing numbers are an artifact from how the stats are collected.
ATM, Flink does only collect these metrics for data which is sent over
connections *between* Flink operators.
Since sources and sinks connect to external systems (and not Flink
operators), the dash board does not show receiving byte for sources and
sent bytes for sinks.

Best, Fabian

2016-05-16 23:02 GMT+02:00 prateekarora :

> Hi
>
> In my flink kafka streaming application i am fetching data from one topic
> and then process and sent to output topic . my application is working fine
> but  flink dashboard shows Source [Bytes/records Received]  and Sink
> [Bytes/records sent] is zero.
>
>
> Duration  Name BytesRecords
>  Bytes
> Records   Status
>   Received  Received
> sent   sent
> 3m 15sSource: Custom Source   0 B  0   249
> MB
> 3,540   RUNNING
>
>
> 3m 15sMap -> Sink: Unnamed203 MB 2,883 0 B
>   0
> RUNNING
>
>
>
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n6941/flink-kafka.png
> >
>
> what is the reason for not showing correct number ?
>
> Regards
> Prateek
>
>
>
>
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kafka-Streaming-Source-Bytes-records-Received-and-Sink-Bytes-records-sent-show-zero-messages-tp6941.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: Flink recovery

2016-05-17 Thread Fabian Hueske
Thanks for reporting back Naveen!



2016-05-17 18:55 GMT+02:00 Madhire, Naveen :

> Hi Robert, With the use of manual save points, I was able to obtain
> exactly-once output with Kafka and HDFS rolling sink.
>
> Thanks to you and Fabian for the help.
>
>
> From: Robert Metzger 
> Reply-To: "user@flink.apache.org" 
> Date: Tuesday, May 17, 2016 at 10:02 AM
>
> To: "user@flink.apache.org" 
> Subject: Re: Flink recovery
>
> Hi,
>
> Savepoints are exactly for that use case:
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/savepoints.html
>
> http://data-artisans.com/how-apache-flink-enables-new-streaming-applications/
>
> Regards,
> Robert
>
> On Tue, May 17, 2016 at 4:25 PM, Madhire, Naveen <
> naveen.madh...@capitalone.com> wrote:
>
>> Hey Robert,
>>
>> What is the best way to stop the streaming job in production if I want to
>> upgrade the application without loosing messages and causing duplicates.
>> How can I test this scenario?
>> We are testing few recovery mechanisms like job failure, application
>> upgrade and node failure.
>>
>>
>>
>> Thanks,
>> Naveen
>>
>> From: Robert Metzger 
>> Reply-To: "user@flink.apache.org" 
>> Date: Tuesday, May 17, 2016 at 6:58 AM
>> To: "user@flink.apache.org" 
>> Subject: Re: Flink recovery
>>
>> Hi Naveen,
>>
>> I think cancelling a job is not the right approach for testing our
>> exactly-once guarantees. By cancelling a job, you are discarding the state
>> of your job. Restarting from scratch (without using a savepoint) will cause
>> duplicates.
>> What you can do to validate the behavior is randomly killing a task
>> manager running your job. Then, the job should restart on the remaining
>> machines (make sure that enough slots are available even after the failure)
>> and you shouldn't have any duplicates in HDFS.
>>
>> Regards,
>> Robert
>>
>>
>>
>>
>>
>> On Tue, May 17, 2016 at 11:27 AM, Stephan Ewen  wrote:
>>
>>> Hi Naveen!
>>>
>>> I assume you are using Hadoop 2.7+? Then you should not see the
>>> ".valid-length" file.
>>>
>>> The fix you mentioned is part of later Flink releases (like 1.0.3)
>>>
>>> Stephan
>>>
>>>
>>> On Mon, May 16, 2016 at 11:46 PM, Madhire, Naveen <
>>> naveen.madh...@capitalone.com> wrote:
>>>
>>>> Thanks Fabian. Actually I don’t see a .valid-length suffix file in the
>>>> output HDFS folder.
>>>> Can you please tell me how would I debug this issue or do you suggest
>>>> anything else to solve this duplicates problem.
>>>>
>>>>
>>>> Thank you.
>>>>
>>>> From: Fabian Hueske 
>>>> Reply-To: "user@flink.apache.org" 
>>>> Date: Saturday, May 14, 2016 at 4:10 AM
>>>> To: "user@flink.apache.org" 
>>>> Subject: Re: Flink recovery
>>>>
>>>> The behavior of the RollingFileSink depends on the capabilities of the
>>>> file system.
>>>> If the file system does not support to truncate files such as older
>>>> HDFS versions, an additional file with a .valid-length suffix is written to
>>>> indicate how much of the file is valid.
>>>> All records / data that come after the valid-length are duplicates.
>>>> Please refer to the JavaDocs of the RollingFileSink for details [1].
>>>>
>>>> If the .valid-length file does not solve the problem, you might have
>>>> found a bug and we should have a closer look at the problem.
>>>>
>>>> Best, Fabian
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/fs/RollingSink.html
>>>>
>>>> 2016-05-14 4:17 GMT+02:00 Madhire, Naveen <
>>>> naveen.madh...@capitalone.com>:
>>>>
>>>>> Thanks Fabian. Yes, I am seeing few records more than once in the
>>>>> output.
>>>>> I am running the job and canceling it from the dashboard, and running
>>>>> again. And using different HDFS file outputs both the times. I was 
>>>>> thinking
>>>>> when I cancel the job, it’s not doing a clean cancel.
>>>>> Is there anything else which I have to use to make it exactly once in
>>>>> the output?
>>>>>
>>>>> I

Re: Performing Reduce on a group of datasets

2016-05-18 Thread Fabian Hueske
I think union is what you are looking for.
Note that all data sets must be of the same type.

2016-05-18 16:15 GMT+02:00 Ritesh Kumar Singh :

> Hi,
>
> How can I perform a reduce operation on a group of datasets using Flink?
> Let's say my map function gives out n datasets: d1, d2, ... dN
> Now I wish to perform my reduce operation on all the N datasets at once
> and not on an individual level. The only way I figured out till now is
> using the union operator first like following:
>
> List> dataList = Arrays.asList(d1, d2, ... dN);
> Dataset dFinal = null;
> for(Dataset ds: dataList)
> {
> dFinal = dFinal.union(ds);
> }
> dFinal.groupBy(0).reduce(...);
>
> Is there a more efficient way of doing the above task using java APIs?
> GroupReduce only works on a single dataset at a time and I can't find any
> other methods that take multiple datasets as an input parameter.
>
> Thanks,
> --
> Ritesh Kumar Singh
> https://riteshtoday.wordpress.com/
>


Re: Performing Reduce on a group of datasets

2016-05-19 Thread Fabian Hueske
I think that sentence is misleading and refers to the internals of Flink.
It should be removed, IMO.
You can only union two DataSets. If you want to union more, you have to do
it one by one.

Btw. union does not cause additional processing overhead.

Cheers, Fabian

2016-05-19 14:44 GMT+02:00 Ritesh Kumar Singh :

> Thanks for the reply Fabian,
>
> Though here's a small thing I found on the documentation page:
>
> https://ci.apache.org/projects/flink/flink-docs-release-0.8/programming_guide.html#transformations
>
> If you look into the Union section, "This operation happens implicitly if
> more than one data set is used for a specific function input." , I'm not
> sure what this is supposed to mean. My initial assumption was something
> like:
>
> dFinal = dFinal.union( d1, d2, ... , dN); // passing more than one
> dataset as function input.
>
> But as expected, this does not satisfy the union method signature. And so
> is that line supposed to mean something else? Or is it a feature not
> supported by flink 0.8 but works with future releases?
>
> Thanks,
> --
> Ritesh Kumar Singh
> https://riteshtoday.wordpress.com/
>


Re: Weird Kryo exception (Unable to find class: java.ttil.HashSet)

2016-05-20 Thread Fabian Hueske
The problem seems to occur quite often.
Did you update your Flink version recently? If so, could you try to
downgrade and see if the problem disappears.

Is it otherwise possible that it is cause by faulty hardware?

2016-05-20 18:05 GMT+02:00 Flavio Pompermaier :

> This time (Europed instead of Europe):
>
> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce 
> (GroupReduce at createResult(PassaggioWithComprAndVend.java:132)) -> Map (Key 
> Extractor)' , caused an error: Error obtaining the sorted input: Thread 
> 'SortMerger spilling thread' terminated due to an exception: The datetime 
> zone id 'Europd/Rome' is not recognised
>   at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456)
>   at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input: 
> Thread 'SortMerger spilling thread' terminated due to an exception: The 
> datetime zone id 'Europd/Rome' is not recognised
>   at 
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>   at 
> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
>   at 
> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
>   at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
>   ... 3 more
> Caused by: java.io.IOException: Thread 'SortMerger spilling thread' 
> terminated due to an exception: The datetime zone id 'Europd/Rome' is not 
> recognised
>   at 
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
> Caused by: java.lang.IllegalArgumentException: The datetime zone id 
> 'Europd/Rome' is not recognised
>   at org.joda.time.DateTimeZone.forID(DateTimeZone.java:229)
>   at 
> de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer.readTimeZone(JodaDateTimeSerializer.java:94)
>   at 
> de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer.read(JodaDateTimeSerializer.java:74)
>   at 
> de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer.read(JodaDateTimeSerializer.java:59)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>   at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>   at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:242)
>   at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:252)
>   at 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:556)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:75)
>   at 
> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
>   at 
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
>   at 
> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>
>
>
> On Fri, May 20, 2016 at 4:25 PM, Flavio Pompermaier 
> wrote:
>
>> This time another error (rerialization instead of serialization):
>>
>> com.esotericsoftware.kryo.KryoException: Unable to find class: 
>> it.okkam.flink.entitons.*rerialization*.pojo.EntitonQuadPojo
>>  at 
>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
>>  at 
>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
>>  at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>>  at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>>  at 
>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
>>  at 
>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
>>  at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>  at 
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>>  at 
>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431)
>>  at 
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135)
>>  at 
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>>  at 
>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>>  at 
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdapti

Re: keyBy on a collection of Pojos

2016-05-23 Thread Fabian Hueske
Actually, the program works correctly (according to the DataStream API)
Let me explain what happens:

1) You do not initialize the count variable, so it will be 0 (summing 0s
results in 0)
2) DataStreams are considered to be unbound (have an infinite size). KeyBy
does not group the records because it would have to wait forever to close
the group due to the infinite input. Instead keyBy basically partitions the
data.
3) By calling sum() on a KeyedStream you compute a running aggregate which
emits one record for each incoming record summing the declared field (this
stays 0 because 0 + 0 = 0).

You will need to
1) initialize count to 1
2) define window to discretize the stream into finite sets (windows) of
records.

Cheers, Fabian

2016-05-23 17:16 GMT+02:00 Deepak Sharma :

> Can you try serializing your POJO ?
>
> Thanks
> Deepak
>
> On Mon, May 23, 2016 at 8:42 PM, Flavio Pompermaier 
> wrote:
>
>> Sorry Rami, you're right :)
>> Unfortunattely I've never used Flink streaming so I cannot be helpful
>> there..
>> Myabe is it something related to the default triggering policy of the
>> streaming environment?
>>
>>
>> On Mon, May 23, 2016 at 5:06 PM, Al-Isawi Rami > > wrote:
>>
>>> Thanks, setters and getters for public fields have no purpose. Also per
>>> the conditions you have mentioned:
>>> "All fields *either* have to be public *or* there must be getters and
>>> setters for all non-public fields.”
>>> Since my fields are declared public there are no impact on adding
>>> getters and setters. ( I have also testing after adding the setters and
>>> getters and as expected that has no effect).
>>>
>>> Could you spot anything else? this should be really easy basic case. I
>>> am really wondering why it is not working.
>>>
>>> For the people who are lazy to open the gist code snippet, this is what
>>> I am trying to do:
>>>
>>> pojoExampleDataStream.
>>> keyBy("productId").
>>> sum("count").
>>> print();
>>>
>>>
>>>
>>> Regards,
>>> -Rami
>>>
>>>
>>> On 23 May 2016, at 17:11, Flavio Pompermaier 
>>> wrote:
>>>
>>> You don't have getters and setters for count and productId.
>>>
>>> Your class should be
>>>
>>> public class PojoExample {
>>> public int count;
>>> public String productId;
>>>
>>> public PojoExample() {}
>>>
>>> public int getCount() {
>>> return count;
>>> }
>>>
>>> public void setCount(int count) {
>>> this.count = count;
>>> }
>>>
>>> public String getProductId() {
>>> return productId;
>>> }
>>>
>>> public void setProductId(String productId) {
>>> this.productId = productId;
>>> }
>>> }
>>>
>>>
>>>
>>> On Mon, May 23, 2016 at 3:40 PM, Al-Isawi Rami <
>>> rami.al-is...@comptel.com> wrote:
>>>
 Thanks Flavio, but as you can see in my code I have already declared my
 pojo to achieve those conditions:
 public class PojoExample {
 public int count;
 public String productId;
 public PojoExample() {
 }
 }

 So it cannot be that.

 -Rami

 On 23 May 2016, at 16:30, Flavio Pompermaier 
 wrote:

 *Conditions* for a class to be treated as a POJO by Flink:

- The class must be public
- It must have a public constructor without arguments
- All fields either have to be public or there must be getters and
setters for all non-public fields. If the field name is foo the
getter and setters must be called getFoo() and setFoo().

 I don't know whether you need to implement also hashCode() and equals()
 actually
 Best,
 Flavio

 On Mon, May 23, 2016 at 3:24 PM, Al-Isawi Rami <
 rami.al-is...@comptel.com> wrote:

> Hi,
>
> I was trying to test some specific issue, but now I cannot seem to get
> the very basic case working. It is most likely that I am blind to
> something, would anyone have quick look at it?
> https://gist.github.com/rami-alisawi/d6ff33ae2d4d6e7bb1f8b329e3e5fa77
>
> It is just a collection of pojos where I am just trying to keyBy one
> field and sum into the other, but I am getting:
> 5> PojoExample{count=0, productId='productA'}
> 8> PojoExample{count=0, productId='productB'}
> 5> PojoExample{count=0, productId='productA'}
> 8> PojoExample{count=0, productId='productB'}
> 5> PojoExample{count=0, productId='productA'}
> 5> PojoExample{count=0, productId='productA'}
> 5> PojoExample{count=0, productId='productA’}
>
> Regards,
> -Rami
>
> Disclaimer: This message and any attachments thereto are intended
> solely for the addressed recipient(s) and may contain confidential
> information. If you are not the intended recipient, please notify the
> sender by reply e-mail and delete the e-mail (including any attachments
> thereto) without producing, distributing or retaining any copies thereof.
> Any review, dissemination or other use of, or taking of any action in
> reliance upon, this information by persons or entities other than the
> intended recipie

Re: writeAsCSV with partitionBy

2016-05-23 Thread Fabian Hueske
Hi Kirsti,

I'm not aware of anybody working on this issue.
Would you like to create a JIRA issue for it?

Best, Fabian

2016-05-23 16:56 GMT+02:00 KirstiLaurila :

> Is there any plans to implement this kind of feature (possibility to write
> to
> data specified partitions) in the near future?
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/writeAsCSV-with-partitionBy-tp4893p7099.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: Apache Beam and Flink

2016-05-26 Thread Fabian Hueske
No, that is not supported yet.
Beam provides a common API but the Flink runner translates programs against
batch sources into the DataSet API programs and Beam programs against
streaming source into DataStream programs.
It is not possible to mix both.

2016-05-26 10:00 GMT+02:00 Ashutosh Kumar :

> Thanks . So if we use Beam API with flink engine then we can get inter
> action between batch and stream ? As i know currently in flink Dataset and
> DStream can not talk . Is this correct ?
>  Thanks
> Ashutosh
>
>
> On Thu, May 26, 2016 at 1:09 PM, Slim Baltagi  wrote:
>
>> Hi Ashutosh
>>
>> Apache Beam provides a Unified API for batch and streaming.
>> It also supports multiple ‘runners’: local, Apache Spark, Apache Flink
>> and Google Cloud Data Flow (commercial service).
>> It is not an alternative to Flink because it is an API and you still need
>> an execution engine.
>> It can be used as an alternative API to using the two Flink APIs :
>> DataSet API and DataStream API.
>> It can be complementary to Flink in the way that you use Beam as API and
>> Flink as the execution engine.
>> Many of Flink committers are also Apache Beam committers!
>> The following blogs describe why Apache Beam:
>>
>>-  from Flink perspective: http://data-artisans.com/why-apache-beam/
>>-  from Google perspective.
>>
>> https://cloud.google.com/blog/big-data/2016/05/why-apache-beam-a-google-perspective
>>
>>
>> A few recent resources about Apache Beam published this month: May 2016
>>
>>- Running Apache Beam (screencast)
>>https://www.youtube.com/watch?v=dwxUbzbwtyI
>>- Introduction to Apache Beam ( presentation)
>>https://skillsmatter.com/skillscasts/8036-apache-flink-may-meetup
>>- Introduction to Apache Beam ( blog)
>>http://www.talend.com/blog/2016/05/02/introduction-to-apache-beam
>>
>>
>> I hope this helps.
>>
>> Thanks
>>
>> Slim Baltagi
>>
>> On May 26, 2016, at 2:20 AM, Ashutosh Kumar 
>> wrote:
>>
>> How does apache beam fits with  flink ? Is it an alternative for flink or
>> complementary to it ?
>>
>> Thanks
>> Ashutosh
>>
>>
>>
>


Re: WindowedStream aggregation methods pre-aggregate?

2016-05-27 Thread Fabian Hueske
Hi Elias,

yes, reduce, fold, and the aggregation functions (sum, min, max, minBy,
maxBy) on WindowedStream preform eager aggregation, i.e., the functions are
apply for each value that enters the window and the state of the window
will consist of a single value. In case you need access to the Window
object (e.g., to include the start or end time), there are overloaded
versions of apply that take a ReduceFunction or FoldFunction and an
additional WindowFunction. These versions eagerly apply the Reduce or
FoldFunction and finally the WindowFunction when the window is closed on
the aggregated value (the iterator will serve a single value).

Cheers, Fabian

2016-05-28 0:48 GMT+02:00 Elias Levy :

> Can someone confirm whether
> the org.apache.flink.streaming.api.scala.WindowedStream methods other than
> "apply" (e.g. "sum") perform pre-aggregation?  The API docs are silent on
> this.
>
>
>


Re: NotSerializableException

2016-06-09 Thread Fabian Hueske
Hi Tarandeep,

the exception suggests that Flink tries to serialize RecordsFilterer as a
user function (this happens via Java Serialization).
I said suggests because the code that uses RecordsFilterer is not included.

To me it looks like RecordsFilterer should not be used as a user function.
It is a helper class to construct a DataSet program, so it should not be
shipped for execution.
You would use such a class as follows:

DataSet records = ...
DataSet filterIDs = ...

RecordsFilterer rf = new RecordsFilterer();
DataSet> result = rf.addFilterFlag(records, filterIDs,
"myField");

Regarding the join code, I would suggest an optimization.
Instead of using CoGroup, I would use distinct and an OuterJoin like this:

DataSet distIds = filtereredIds.distinct();
DataSet result = records
  .leftOuterJoin(distIds)
  .where(KEYSELECTOR)
  .equalTo("*") // use full string as key
  .with(JOINFUNC) // set Bool to false if right == null, true otherwise

Best, Fabian

2016-06-09 2:28 GMT+02:00 Tarandeep Singh :

> Hi,
>
> I am getting NoSerializableException in this class-
>
> 
>
> public class RecordsFilterer {
>
> public DataSet> addFilterFlag(DataSet dataset, 
> DataSet filteredIds, String fieldName) {
> return dataset.coGroup(filteredIds)
> .where(new KeySelector() {
> @Override
> public String getKey(T t) throws Exception {
> String s = (String) t.get(fieldName);
> return s != null ? s : UUID.randomUUID().toString();
> }
> })
> .equalTo((KeySelector) s -> s)
> .with(new CoGroupFunction>() {
> @Override
> public void coGroup(Iterable records, Iterable 
> ids,
> Collector> 
> collector) throws Exception {
> boolean filterFlag = false;
> for (String id : ids) {
> filterFlag = true;
> }
>
> for (T record : records) {
> collector.collect(new Tuple2<>(filterFlag, 
> record));
> }
> }
> });
>
> }
> }
>
>
> What I am trying to do is write a generic code that will join Avro records
> (of different types) with String records and there is a match add a filter
> flag. This way I can use the same code for different Avro record types. But
> I am getting this exception-
>
> Exception in thread "main" org.apache.flink.optimizer.CompilerException:
> Error translating node 'Map "Key Extractor" : MAP [[ GlobalProperties
> [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null,
> grouped=null, unique=null] ]]': Could not write the user code wrapper class
> org.apache.flink.api.common.operators.util.UserCodeObjectWrapper :
> java.io.NotSerializableException:
> com.styleseat.dataplatform.etl.jobs.boundary.RecordsFilterer
> at
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:386)
> at
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:109)
> at
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:198)
> at
> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
> at
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
> at
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
> at
> org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128)
> at
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:188)
> at
> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:187)
> at
> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:90)
> at
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
> at
> com.styleseat.dataplatform.etl.jobs.search.RunSearchLogProcessorV2.runSearchLogProcessor(RunSearchLogProcessorV2.java:57)
> at
> com.styleseat.dataplatform.etl.jobs.search.RunSearchLogProcessorV2.main(RunSearchLogProcessorV2.java:32)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
> Caused by:
> org.apache.flink.runtime.operators.util.CorruptConfigurationException:
> Could not write the user code wrapper class
> org.apache.flink.api.common.operators.util.UserCodeObjectWrapper :
> java.io.NotSerializableException: Records

Re: FlinkKafkaProducer API

2016-06-09 Thread Fabian Hueske
Hi Elias,

thanks for your feedback. I think those are good observations and
suggestions to improve the Kafka producers.
The best place to discuss such improvements is the dev mailing list.

Would like to repost your mail there or open JIRAs where the discussion
about these changes can continue?

Thanks, Fabian

2016-06-09 3:58 GMT+02:00 Elias Levy :

> The FlinkKafkaProducer API seems more difficult to use than it should be.
>
> The API requires you pass it a SerializationSchema or a
> KeyedSerializationSchema, but the Kafka producer already has a
> serialization API.  Requiring a serializer in the Flink API precludes the
> use of the Kafka serializers.  For instance, they preclude the use of the
> Confluent KafkaAvroSerializer class that makes use of the Confluent Schema
> Registry.  Ideally, the serializer would be optional, so as to allow the
> Kafka producer serializers to handle the task.
>
> In addition, the KeyedSerializationSchema conflates message key extraction
> with key serialization.  If the serializer were optional, to allow the
> Kafka producer serializers to take over, you'd still need to extract a key
> from the message.
>
> And given that the key may not be part of the message you want to write to
> Kafka, an upstream step may have to package the key with the message to
> make both available to the sink, for instance in a tuple. That means you
> also need to define a method to extract the message to write to Kafka from
> the element passed into the sink by Flink.
>
> In summary, there should be separation of extraction of the key and
> message from the element passed into the sink from serialization, and the
> serialization step should be optional.
>
>
>


Re: How to maintain the state of a variable in a map transformation.

2016-06-09 Thread Fabian Hueske
Hi Ravikumar,

I'll try to answer your questions:
1) If you set the parallelism of a map function to 1, there will be only a
single instance of that function regardless whether it is execution locally
or remotely in a cluster.
2) Flink does also support aggregations, (reduce, groupReduce, combine,
...). However, I do not see how this would help with a stateful map
function.
3) In Flink DataSet programs you usually construct the complete program and
call execute() after you have defined your sinks. There are two exceptions:
print() and collect() which both add special sinks and immediately execute
your program. print() prints the result to the stdout of the submitting
client and collect() fetches a dataset as collection.
4) I am not sure I understood your question. When you obtain an
ExecutionEnvironment with ExecutionEnvironment.getExecutionEnvrionment()
the type of the returned environment depends on the context in which the
program was executed. It can be a local environment if it is executed from
within an IDE or a RemodeExecutionEnvironment if the program is executed
via the CLI client and shipped to a remote cluster.
5) A map operator processes records one after the other, i.e., as a
sequence. If you need a certain order, you can call DataSet.sortPartition()
to locally sort the partition.

Hope that helps,
Fabian

2016-06-09 12:23 GMT+02:00 Ravikumar Hawaldar 
:

> Hi Till, Thank you for your answer, I have couple of questions
>
> 1) Setting parallelism on a single map function in local is fine but on
> distributed will it work as local execution?
>
> 2) Is there any other way apart from setting parallelism? Like spark
> aggregate function?
>
> 3) Is it necessary that after transformations to call execute function? Or
> Execution starts as soon as it encounters a action (Similar to Spark)?
>
> 4) Can I create a global execution environment (Either local or
> distributed) for different Flink program in a module?
>
> 5) How to make the records come in sequence for a map or any other
> operator?
>
>
> Regards,
> Ravikumar
>
>
> On 8 June 2016 at 21:14, Till Rohrmann  wrote:
>
>> Hi Ravikumar,
>>
>> Flink's operators are stateful. So you can simply create a variable in
>> your mapper to keep the state around. But every mapper instance will have
>> it's own state. This state is determined by the records which are sent to
>> this mapper instance. If you need a global state, then you have to set the
>> parallelism to 1.
>>
>> Cheers,
>> Till
>>
>> On Wed, Jun 8, 2016 at 5:08 PM, Ravikumar Hawaldar <
>> ravikumar.hawal...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> I have an DataSet which is roughly a record in a
>>> DataSet Or a file.
>>>
>>> Now I am using map transformation on this DataSet to compute a variable
>>> (coefficients of linear regression parameters and data structure used is a
>>> double[]).
>>>
>>> Now the issue is that, per record the variable will get updated and I am
>>> struggling to maintain state of this variable for the next record.
>>>
>>> In simple, for first record the variable values will be 0.0, and after
>>> first record the variable will get updated and I have to pass this updated
>>> variable for the second record and so on for all records in DataSet.
>>>
>>> Any suggestions on how to maintain state of a variable?
>>>
>>>
>>> Regards,
>>> Ravikumar
>>>
>>
>>
>


Re: Strange behavior of DataStream.countWindow

2016-06-09 Thread Fabian Hueske
Hi Yukun,

the problem is that the KeySelector is internally invoked multiple times.
Hence it must be deterministic, i.e., it must extract the same key for the
same object if invoked multiple times.
The documentation is not discussing this aspect and should be extended.

Thanks for pointing out this issue.

Cheers,
Fabian


2016-06-09 13:19 GMT+02:00 Yukun Guo :

> I’m playing with the (Window)WordCount example from Flink QuickStart. I
> generate a DataStream consisting of 1000 Strings of random digits, which
> is windowed with a tumbling count window of 50 elements:
>
> import org.apache.flink.api.common.functions.FlatMapFunction;import 
> org.apache.flink.api.java.functions.KeySelector;import 
> org.apache.flink.api.java.tuple.Tuple2;import 
> org.apache.flink.streaming.api.datastream.DataStream;import 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import 
> org.apache.flink.util.Collector;
> import java.util.Random;
> public class DigitCount {
>
>
> public static void main(String[] args) throws Exception {
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> DataStream text = env.fromElements(
> "14159265358979323846264338327950288419716939937510",
> "58209749445923078164062862089986280348253421170679",
> "82148086513282306647093844609550582231725359408128",
> "48111745028410270193852110555964462294895493038196",
> "44288109756659334461284756482337867831652712019091",
> "45648566923460348610454326648213393607260249141273",
> "72458700660631558817488152092096282925409171536436",
> "78925903600113305305488204665213841469519415116094",
> "33057270365759591953092186117381932611793105118548",
> "07446237996274956735188575272489122793818301194912",
> "98336733624406566430860213949463952247371907021798",
> "60943702770539217176293176752384674818467669405132",
> "00056812714526356082778577134275778960917363717872",
> "14684409012249534301465495853710507922796892589235",
> "42019956112129021960864034418159813629774771309960",
> "5187072113499837297804995105973173281609631859",
> "50244594553469083026425223082533446850352619311881",
> "71010003137838752886587533208381420617177669147303",
> "59825349042875546873115956286388235378759375195778",
> "18577805321712268066130019278766111959092164201989"
> );
>
> DataStream> digitCount = text
> .flatMap(new Splitter())
> .keyBy(new KeySelector, Integer>() {
> @Override
> public Integer getKey(Tuple2 x) throws 
> Exception {
> return x.f0 % 2;
> }
> })
> .countWindow(50)
> .sum(1);
>
> digitCount.print();
> env.execute();
>
> }
>
> public static final class Splitter implements FlatMapFunction Tuple2> {
> @Override
> public void flatMap(String value, Collector> 
> out) {
> for (String token : value.split("")) {
> if (token.length() == 0) {
> continue;
> }
> out.collect(Tuple2.of(Integer.parseInt(token), 1));
> }
> }
> }
> }
>
> The code above will produce 19 lines of output which is reasonable as the
> 1000 digits will be keyed into 2 partitions where one partition contains
> 500+ elements and the other contains slightly fewer than 500 elements,
> therefore as a result one 50-digit window is ignored.
>
> So far so good, but if I replace the mod KeySelector with a random one:
>
> private static class RandomKeySelector implements KeySelector {
> private int nPartitions;
> private Random random;
>
> RandomKeySelector(int nPartitions) {
> this.nPartitions = nPartitions;
> random = new Random();
> }
>
> @Override
> public Integer getKey(T dummy) throws Exception {
> return random.nextInt(this.nPartitions);
> }
> }
>
> and then
>
> .keyBy(new RandomKeySelector>(2))
>
> it may generate 17 or 18 lines of output. How could that happen? Moreover,
> if I set the number of partitions to 10, in theory the lines of output
> should be no fewer than 11, but actually it can be only 9.
>
> Please help me understand why countWindow behaves like this.
>


Re: Maxby() and KeyBy() question

2016-06-09 Thread Fabian Hueske
Hi,

you are computing a running aggregate, i.e., you're getting one output
record for each input record and the output record is the record with the
largest value observed so far.
If the record with the largest value is the first, the record is sent out
another time. This is what happened with Match3 in your example.

There are two ways to compute aggregates on streams: 1) a running aggregate
as you just did, or 2) a windowed aggregate.
For a windowed aggregate, you need to need to specify a window. The window
can be time or count based.
The following blog post should be a good introduction into Flink's window
support [1].

Best, Fabian

[1] http://flink.apache.org/news/2015/12/04/Introducing-windows.html

2016-06-09 14:36 GMT+02:00 iñaki williams :

> Hi again!
>
> I am working with two DataStreams, I want to get the maximun value from
> each pair of them, for example:
>
> //Informacion (matchName, LocalOdd, AwayOdd)
>
> Informacion info1= new Informacion("Match1", 1.10, 3.22);
> Informacion info2= new Informacion("Match2", 2.11, 1.10);
> Informacion info3= new Informacion("Match3", 4.10, 1.05);
>
> Informacion info11= new Informacion("Match1", 1.80, 2.20);
> Informacion info22= new Informacion("Match2", 3.10, 1.15);
> Informacion info33= new Informacion("Match3", 2.12, 1.25);
>
>
> DataStream src = see.fromElements(info1,info2, info3);
> DataStream src2 =
> see.fromElements(info11,info22,info33);
> DataStream src3= src.union(src2);
>
> DataStream maxLocal =
> src3.keyBy("nombrePartido").maxBy("cuotaGanadorLocal");
>
> maxLocal.print();
>
>
>
> Let's suppose that those are tennis matches with their names and their bet
> odds, and the name of the matches are the same on both streams, I mean
> Match1=Match1 , Match2=Match2  (Image that match 1 name is "Rafa Nadal
> - Roger Federer").
>
>
> I want to get the maximun localOdd from matches with the same name, the
> result of my code is:
>
>
> 1> Informacion [matchName=Match2, localOdd=2.11, awayOdd=1.1]
> 1> *Informacion [matchName=Match3, localOdd=4.1, awayOdd=1.05]*
> 1> Informacion [matchName=Match2, localOdd=3.1, awayOdd=1.15]
> 1>* Informacion [matchName=Match3, localOdd=4.1, awayOdd=1.05]*
> 4> Informacion [matchName=Match1, localOdd=1.1, awayOdd=3.22]
> 4> Informacion [matchName=Match1, localOdd=1.8, awayOdd=2.2]
>
> It seems like it is taking the biggest value from all the matches and not
> by keyed matches
>
>
> I am looking for this:
>
>
> Informacion [matchName=Match1, localOdd=1.8, awayOdd=2.2]
> Informacion [matchName=Match2, localOdd=3.1, awayOdd=1.15]
> Informacion [matchName=Match3, localOdd=4.1, awayOdd=1.05]
>
>
>
> How can I get it?
>
>
> Thanks in advanced
>
>
>
>
>


Re: HBase reads and back pressure

2016-06-09 Thread Fabian Hueske
Hi Christophe,

where does the backpressure appear? In front of the sink operator or before
the window operator?

In any case, I think you can improve your WindowFunction if you convert
parts of it into a FoldFunction.
The FoldFunction would take care of the statistics computation and the
WindowFunction would only assemble the result record including extracting
the start time of the window.

Then you could do:

ws.apply(new SummaryStatistics(), new YourFoldFunction(), new
YourWindowFunction());

This is more efficient because the FoldFunction is eagerly applied when
ever a new element is added to a window. Hence, the window does only hold a
single value (SummaryStatistics) instead of all element added to the
window. In contrast the WindowFunction is called when the window is finally
evaluated.

Hope this helps,
Fabian

2016-06-09 14:53 GMT+02:00 Christophe Salperwyck <
christophe.salperw...@gmail.com>:

> Hi,
>
> I am writing a program to read timeseries from HBase and do some daily
> aggregations (Flink streaming). For now I am just computing some average so
> not very consuming but my HBase read get slower and slower (I have few
> billions of points to read). The back pressure is almost all the time close
> to 1.
>
> I use custom timestamp:
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
> so I implemented a custom extractor based on:
> AscendingTimestampExtractor
>
> At the beginning I have 5M reads/s and after 15 min I have just 1M read/s
> then it get worse and worse. Even when I cancel the job, data are still
> being written in HBase (I did a sink similar to the example - with a cache
> of 100s of HBase Puts to be a bit more efficient).
>
> When I don't put a sink it seems to stay on 1M reads/s.
>
> Do you have an idea why ?
>
> Here is a bit of code if needed:
> final WindowedStream ws = hbaseDS.keyBy(0)
> .assignTimestampsAndWatermarks(new AscendingTimestampExtractor())
> .keyBy(0)
> .timeWindow(Time.days(1));
>
> final SingleOutputStreamOperator puts = ws.apply(new
> WindowFunction() {
>
> @Override
> public void apply(final Tuple key, final TimeWindow window, final
> Iterable input,
> final Collector out) throws Exception {
>
> final SummaryStatistics summaryStatistics = new SummaryStatistics();
> for (final ANA ana : input) {
> summaryStatistics.addValue(ana.getValue());
> }
> final Put put = buildPut((String) key.getField(0), window.getStart(),
> summaryStatistics);
> out.collect(put);
> }
> });
>
> And how I started Flink on YARN :
> flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2
> -Dtaskmanager.network.numberOfBuffers=4096
>
> Thanks for any feedback!
>
> Christophe
>


Re: How to maintain the state of a variable in a map transformation.

2016-06-09 Thread Fabian Hueske
Hi,

1) Yes, that is correct. If you set the parallelism of an operator to 1 it
is only executed on a single node. It depends on your application, if you
need a global state or whether multiple local states are OK.
2) Flink programs follow the concept a data flow. There is no communication
between parallel instances of a task, i.e., all four tasks of a MapOperator
with parallelism 4 cannot talk to each other. You might want to take a look
at Flink's iteration operators. With these you can feed data back into a
previous operator [1].
4) Yes, that should work.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/iterations.html

2016-06-09 15:01 GMT+02:00 Ravikumar Hawaldar 
:

> Hi Fabian, Thank you for your answers,
>
> 1) If there is only single instance of that function, then it will defeat
> the purpose of distributed correct me if I am wrong, so If I run
> parallelism with 1 on cluster does that mean it will execute on only one
> node?
>
> 2) I mean to say, when a map operator returns a variable, is there any
> other function which takes that updated variable and returns that to all
> instances of map?
>
> 3) Question Cleared.
>
> 4) My question was can I use same ExecutionEnvironment for all flink
> programs in a module.
>
> 5) Question Cleared.
>
>
> Regards
> Ravikumar
>
>
>
> On 9 June 2016 at 17:58, Fabian Hueske  wrote:
>
>> Hi Ravikumar,
>>
>> I'll try to answer your questions:
>> 1) If you set the parallelism of a map function to 1, there will be only
>> a single instance of that function regardless whether it is execution
>> locally or remotely in a cluster.
>> 2) Flink does also support aggregations, (reduce, groupReduce, combine,
>> ...). However, I do not see how this would help with a stateful map
>> function.
>> 3) In Flink DataSet programs you usually construct the complete program
>> and call execute() after you have defined your sinks. There are two
>> exceptions: print() and collect() which both add special sinks and
>> immediately execute your program. print() prints the result to the stdout
>> of the submitting client and collect() fetches a dataset as collection.
>> 4) I am not sure I understood your question. When you obtain an
>> ExecutionEnvironment with ExecutionEnvironment.getExecutionEnvrionment()
>> the type of the returned environment depends on the context in which the
>> program was executed. It can be a local environment if it is executed from
>> within an IDE or a RemodeExecutionEnvironment if the program is executed
>> via the CLI client and shipped to a remote cluster.
>> 5) A map operator processes records one after the other, i.e., as a
>> sequence. If you need a certain order, you can call DataSet.sortPartition()
>> to locally sort the partition.
>>
>> Hope that helps,
>> Fabian
>>
>> 2016-06-09 12:23 GMT+02:00 Ravikumar Hawaldar <
>> ravikumar.hawal...@gmail.com>:
>>
>>> Hi Till, Thank you for your answer, I have couple of questions
>>>
>>> 1) Setting parallelism on a single map function in local is fine but on
>>> distributed will it work as local execution?
>>>
>>> 2) Is there any other way apart from setting parallelism? Like spark
>>> aggregate function?
>>>
>>> 3) Is it necessary that after transformations to call execute function?
>>> Or Execution starts as soon as it encounters a action (Similar to Spark)?
>>>
>>> 4) Can I create a global execution environment (Either local or
>>> distributed) for different Flink program in a module?
>>>
>>> 5) How to make the records come in sequence for a map or any other
>>> operator?
>>>
>>>
>>> Regards,
>>> Ravikumar
>>>
>>>
>>> On 8 June 2016 at 21:14, Till Rohrmann  wrote:
>>>
>>>> Hi Ravikumar,
>>>>
>>>> Flink's operators are stateful. So you can simply create a variable in
>>>> your mapper to keep the state around. But every mapper instance will have
>>>> it's own state. This state is determined by the records which are sent to
>>>> this mapper instance. If you need a global state, then you have to set the
>>>> parallelism to 1.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Wed, Jun 8, 2016 at 5:08 PM, Ravikumar Hawaldar <
>>>> ravikumar.hawal...@gmail.com> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> I have an DataSet which is roughly a record in a
>>>>> DataSet Or a file.
>>>>>
>>>>> Now I am using map transformation on this DataSet to compute a
>>>>> variable (coefficients of linear regression parameters and data structure
>>>>> used is a double[]).
>>>>>
>>>>> Now the issue is that, per record the variable will get updated and I
>>>>> am struggling to maintain state of this variable for the next record.
>>>>>
>>>>> In simple, for first record the variable values will be 0.0, and after
>>>>> first record the variable will get updated and I have to pass this updated
>>>>> variable for the second record and so on for all records in DataSet.
>>>>>
>>>>> Any suggestions on how to maintain state of a variable?
>>>>>
>>>>>
>>>>> Regards,
>>>>> Ravikumar
>>>>>
>>>>
>>>>
>>>
>>
>


Re: Data Source Generator emits 4 instances of the same tuple

2016-06-09 Thread Fabian Hueske
We solved this problem yesterday at the Flink Hackathon.
The issue was that the source function was started with parallelism 4 and
each function read the whole file.

Cheers, Fabian

2016-06-06 16:53 GMT+02:00 Biplob Biswas :

> Hi,
>
> I tried streaming the source data 2 ways
>
> 1. Is a simple straight forward way of sending data without using the
> serving speed concept
> http://pastebin.com/cTv0Pk5U
>
>
> 2. The one where I use the TaxiRide source which is exactly similar except
> loading the data in the proper data structures.
> http://pastebin.com/NenvXShH
>
>
> I hope to get a solution out of it.
>
> Thanks and Regards
> Biplob Biswas
>
>
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Data-Source-Generator-emits-4-instances-of-the-same-tuple-tp7392p7405.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: FlinkKafkaProducer API

2016-06-09 Thread Fabian Hueske
Great, thank you!

2016-06-09 17:38 GMT+02:00 Elias Levy :

>
> On Thu, Jun 9, 2016 at 5:16 AM, Fabian Hueske  wrote:
>
>> thanks for your feedback. I think those are good observations and
>> suggestions to improve the Kafka producers.
>> The best place to discuss such improvements is the dev mailing list.
>>
>> Would like to repost your mail there or open JIRAs where the discussion
>> about these changes can continue?
>
>
> I opened FLINK-4050.  Since the JIRAs are posted to the dev list, I won't
> cross post.
>
> Cheers,
> Elias
>
>
>


Re: HBase reads and back pressure

2016-06-09 Thread Fabian Hueske
OK, this indicates that the operator following the source is a bottleneck.

If that's the WindowOperator, it makes sense to try the refactoring of the
WindowFunction.
Alternatively, you can try to run that operator with a higher parallelism.

2016-06-09 17:39 GMT+02:00 Christophe Salperwyck <
christophe.salperw...@gmail.com>:

> Hi Fabian,
>
> Thanks for the help, I will try that. The backpressure was on the source
> (HBase).
>
> Christophe
>
> 2016-06-09 16:38 GMT+02:00 Fabian Hueske :
>
>> Hi Christophe,
>>
>> where does the backpressure appear? In front of the sink operator or
>> before the window operator?
>>
>> In any case, I think you can improve your WindowFunction if you convert
>> parts of it into a FoldFunction.
>> The FoldFunction would take care of the statistics computation and the
>> WindowFunction would only assemble the result record including extracting
>> the start time of the window.
>>
>> Then you could do:
>>
>> ws.apply(new SummaryStatistics(), new YourFoldFunction(), new
>> YourWindowFunction());
>>
>> This is more efficient because the FoldFunction is eagerly applied when
>> ever a new element is added to a window. Hence, the window does only hold a
>> single value (SummaryStatistics) instead of all element added to the
>> window. In contrast the WindowFunction is called when the window is finally
>> evaluated.
>>
>> Hope this helps,
>> Fabian
>>
>> 2016-06-09 14:53 GMT+02:00 Christophe Salperwyck <
>> christophe.salperw...@gmail.com>:
>>
>>> Hi,
>>>
>>> I am writing a program to read timeseries from HBase and do some daily
>>> aggregations (Flink streaming). For now I am just computing some average so
>>> not very consuming but my HBase read get slower and slower (I have few
>>> billions of points to read). The back pressure is almost all the time close
>>> to 1.
>>>
>>> I use custom timestamp:
>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>
>>> so I implemented a custom extractor based on:
>>> AscendingTimestampExtractor
>>>
>>> At the beginning I have 5M reads/s and after 15 min I have just 1M
>>> read/s then it get worse and worse. Even when I cancel the job, data are
>>> still being written in HBase (I did a sink similar to the example - with a
>>> cache of 100s of HBase Puts to be a bit more efficient).
>>>
>>> When I don't put a sink it seems to stay on 1M reads/s.
>>>
>>> Do you have an idea why ?
>>>
>>> Here is a bit of code if needed:
>>> final WindowedStream ws = hbaseDS.keyBy(0)
>>> .assignTimestampsAndWatermarks(new AscendingTimestampExtractor())
>>> .keyBy(0)
>>> .timeWindow(Time.days(1));
>>>
>>> final SingleOutputStreamOperator puts = ws.apply(new
>>> WindowFunction() {
>>>
>>> @Override
>>> public void apply(final Tuple key, final TimeWindow window, final
>>> Iterable input,
>>> final Collector out) throws Exception {
>>>
>>> final SummaryStatistics summaryStatistics = new SummaryStatistics();
>>> for (final ANA ana : input) {
>>> summaryStatistics.addValue(ana.getValue());
>>> }
>>> final Put put = buildPut((String) key.getField(0), window.getStart(),
>>> summaryStatistics);
>>> out.collect(put);
>>> }
>>> });
>>>
>>> And how I started Flink on YARN :
>>> flink-1.0.3/bin/yarn-session.sh -n 20 -tm 16384 -s 2
>>> -Dtaskmanager.network.numberOfBuffers=4096
>>>
>>> Thanks for any feedback!
>>>
>>> Christophe
>>>
>>
>>
>


Re: Strange behavior of DataStream.countWindow

2016-06-13 Thread Fabian Hueske
If I understood you correctly, you want to compute windows in parallel
without using a key.
Are you aware that the results of such a computation is not deterministic
and kind of arbitrary?

If that is still OK for you, you can use a mapper to assign the current
parallel index as a key field, i.e., wrap the data in a Tuple2 and then do a keyBy(0). This will keep the data local. The mapper
should extend RichMapFunction. You can access the parallel index via
getRuntimeContext().getParallelSubTaskId().

Hope this helps.
Cheers, Fabian

2016-06-11 11:53 GMT+02:00 Yukun Guo :

> Thx, now I use element.hashCode() % nPartitions and it works as expected.
>
> But I'm afraid it's not a best practice for just turning a plain (already
> paralellized) DataStream into a KeyedStream? Because it introduces some
> overhead due to physical repartitioning by key, which is unnecessary since
> I don't really care about keys.
>
> On 9 June 2016 at 22:00, Fabian Hueske  wrote:
>
>> Hi Yukun,
>>
>> the problem is that the KeySelector is internally invoked multiple times.
>> Hence it must be deterministic, i.e., it must extract the same key for
>> the same object if invoked multiple times.
>> The documentation is not discussing this aspect and should be extended.
>>
>> Thanks for pointing out this issue.
>>
>> Cheers,
>> Fabian
>>
>>
>> 2016-06-09 13:19 GMT+02:00 Yukun Guo :
>>
>>> I’m playing with the (Window)WordCount example from Flink QuickStart. I
>>> generate a DataStream consisting of 1000 Strings of random digits,
>>> which is windowed with a tumbling count window of 50 elements:
>>>
>>> import org.apache.flink.api.common.functions.FlatMapFunction;import 
>>> org.apache.flink.api.java.functions.KeySelector;import 
>>> org.apache.flink.api.java.tuple.Tuple2;import 
>>> org.apache.flink.streaming.api.datastream.DataStream;import 
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import
>>>  org.apache.flink.util.Collector;
>>> import java.util.Random;
>>> public class DigitCount {
>>>
>>>
>>> public static void main(String[] args) throws Exception {
>>> final StreamExecutionEnvironment env = 
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>
>>> DataStream text = env.fromElements(
>>> "14159265358979323846264338327950288419716939937510",
>>> "58209749445923078164062862089986280348253421170679",
>>> "82148086513282306647093844609550582231725359408128",
>>> "48111745028410270193852110555964462294895493038196",
>>> "44288109756659334461284756482337867831652712019091",
>>> "45648566923460348610454326648213393607260249141273",
>>> "72458700660631558817488152092096282925409171536436",
>>> "78925903600113305305488204665213841469519415116094",
>>> "33057270365759591953092186117381932611793105118548",
>>> "07446237996274956735188575272489122793818301194912",
>>> "98336733624406566430860213949463952247371907021798",
>>> "60943702770539217176293176752384674818467669405132",
>>> "00056812714526356082778577134275778960917363717872",
>>> "14684409012249534301465495853710507922796892589235",
>>> "42019956112129021960864034418159813629774771309960",
>>> "5187072113499837297804995105973173281609631859",
>>> "50244594553469083026425223082533446850352619311881",
>>> "71010003137838752886587533208381420617177669147303",
>>> "59825349042875546873115956286388235378759375195778",
>>> "18577805321712268066130019278766111959092164201989"
>>> );
>>>
>>> DataStream> digitCount = text
>>> .flatMap(new Splitter())
>>> .keyBy(new KeySelector, Integer>() 
>>> {
>>> @Override
>>> public Integer getKey(Tuple2 x) 
>>> throws Exception {
>>> return x.f0 % 2;
>>> }
>>> })
>>> .countWindow(50)
>>> .sum(1);
>&

Re: HBase reads and back pressure

2016-06-13 Thread Fabian Hueske
Do the backpressure metrics indicate that the sink function is blocking?

2016-06-13 16:58 GMT+02:00 Christophe Salperwyck <
christophe.salperw...@gmail.com>:

> To continue, I implemented the ws.apply(new SummaryStatistics(), new
> YourFoldFunction(), new YourWindowFunction());
>
> It works fine when there is no sink, but when I put an HBase sink it seems
> that the sink, somehow, blocks the flow. The sink writes very little data
> into HBase and when I limit my input to just few sensors, it works well. Any
> idea?
>
> final SingleOutputStreamOperator aggregates = ws
> .apply(
> new Aggregate(),
> new FoldFunction() {
>
> @Override
> public Aggregate fold(final Aggregate accumulator, final ANA value) throws
> Exception {
> accumulator.addValue(value.getValue());
> return accumulator;
> }
> },
> new WindowFunction() {
>
> @Override
> public void apply(final Tuple key, final TimeWindow window, final
> Iterable input,
> final Collector out) throws Exception {
> for (final Aggregate aggregate : input) {
> aggregate.setM((String) key.getField(0));
> aggregate.setTime(window.getStart());
> out.collect(aggregate);
> }
> }
> });
> aggregates.
> setParallelism(10).
> writeUsingOutputFormat(new OutputFormat() {
> private static final long serialVersionUID = 1L;
> HBaseConnect hBaseConnect;
> Table table;
> final int flushSize = 100;
> List puts = new ArrayList<>();
> @Override
> public void writeRecord(final Aggregate record) throws IOException {
> puts.add(record.buildPut());
> if (puts.size() == flushSize) {
> table.put(puts);
> }
> }
> @Override
> public void open(final int taskNumber, final int numTasks) throws
> IOException {
> hBaseConnect = new HBaseConnect();
> table = hBaseConnect.getHTable("PERF_TEST");
> }
> @Override
> public void configure(final Configuration parameters) {
> // TODO Auto-generated method stub
> }
> @Override
> public void close() throws IOException {
> //last inserts
> table.put(puts);
> table.close();
> hBaseConnect.close();
> }
> });
>
> 2016-06-13 13:47 GMT+02:00 Maximilian Michels :
>
>> Thanks!
>>
>> On Mon, Jun 13, 2016 at 12:34 PM, Christophe Salperwyck
>>  wrote:
>> > Hi,
>> > I vote on this issue and I agree this would be nice to have.
>> >
>> > Thx!
>> > Christophe
>> >
>> > 2016-06-13 12:26 GMT+02:00 Aljoscha Krettek :
>> >>
>> >> Hi,
>> >> I'm afraid this is currently a shortcoming in the API. There is this
>> open
>> >> Jira issue to track it:
>> https://issues.apache.org/jira/browse/FLINK-3869. We
>> >> can't fix it before Flink 2.0, though, because we have to keep the API
>> >> stable on the Flink 1.x release line.
>> >>
>> >> Cheers,
>> >> Aljoscha
>> >>
>> >> On Mon, 13 Jun 2016 at 11:04 Christophe Salperwyck
>> >>  wrote:
>> >>>
>> >>> Thanks for the feedback and sorry that I can't try all this straight
>> >>> away.
>> >>>
>> >>> Is there a way to have a different function than:
>> >>> WindowFunction> TimeWindow>()
>> >>>
>> >>> I would like to return a HBase Put and not a SummaryStatistics. So
>> >>> something like this:
>> >>> WindowFunction()
>> >>>
>> >>> Christophe
>> >>>
>> >>> 2016-06-09 17:47 GMT+02:00 Fabian Hueske :
>> >>>>
>> >>>> OK, this indicates that the operator following the source is a
>> >>>> bottleneck.
>> >>>>
>> >>>> If that's the WindowOperator, it makes sense to try the refactoring
>> of
>> >>>> the WindowFunction.
>> >>>> Alternatively, you can try to run that operator with a higher
>> >>>> parallelism.
>> >>>>
>> >>>> 2016-06-09 17:39 GMT+02:00 Christophe Salperwyck
>> >>>> :
>> >>>>>
>> >>>>> Hi Fabian,
>> >>>>>
>> >>>>> Thanks for the help, I will try that. The backpressure was on the
>> >>>>> source (HBase).
>> >>>>>
>> >>>>> Christophe
>> >>>>>
>> >>>>> 2016-06-09 16:38 GMT+02:00 Fabian Hueske :
>> >>>>>>
>> >>>>>> Hi Christophe,
>> >>>>>>
>> >>>>>> where does

Re: Kinesis connector classpath issue when running Flink 1.1-SNAPSHOT on YARN

2016-06-17 Thread Fabian Hueske
Hi Josh,

I assume that you build the SNAPSHOT version yourself. I had similar
version conflicts for Apache HttpCore with Flink SNAPSHOT versions on EMR.
The problem is cause by a changed behavior in Maven 3.3 and following
versions.
Due to these changes, the dependency shading is not working correctly.
That's why we use Maven 3.2 to build the Flink release artifacts.

Can you check whether you used Maven 3.3 and try to downgrade to 3.2 if
that was the case?

Cheers, Fabian

2016-06-17 8:12 GMT+02:00 Tai Gordon :

> Hi Josh,
>
> I’m looking into the problem. Seems like the connector is somehow using
> older versions of httpclient.
> Can you print the loaded class path at runtime, and check the path &
> version of the loaded httpclient / httpcore dependency?
> i.e. `classOf[HttpConnectionParams].getResource("
> HttpConnectionParams.class").toString`
>
> Also, on which commit was your kinesis connector built?
>
> Regards,
> Gordon
>
>
> On June 17, 2016 at 1:08:37 AM, Josh (jof...@gmail.com) wrote:
>
> Hey,
>
> I've been running the Kinesis connector successfully now for a couple of
> weeks, on a Flink cluster running Flink 1.0.3 on EMR 2.7.1/YARN.
>
> Today I've been trying to get it working on a cluster running the current
> Flink master (1.1-SNAPSHOT) but am running into a classpath issue when
> starting the job. This only happens when running on EMR/YARN (it's fine
> when running 1.1-SNAPSHOT locally, and when running 1.0.3 on EMR)
>
> 
>  The program finished with the following exception:
>
> java.lang.NoSuchMethodError:
> org.apache.http.params.HttpConnectionParams.setSoKeepalive(Lorg/apache/http/params/HttpParams;Z)V
> at
> com.amazonaws.http.HttpClientFactory.createHttpClient(HttpClientFactory.java:96)
> at com.amazonaws.http.AmazonHttpClient.(AmazonHttpClient.java:187)
> at
> com.amazonaws.AmazonWebServiceClient.(AmazonWebServiceClient.java:136)
> at
> com.amazonaws.AmazonWebServiceClient.(AmazonWebServiceClient.java:120)
> at
> com.amazonaws.services.kinesis.AmazonKinesisClient.(AmazonKinesisClient.java:157)
> at
> com.amazonaws.services.kinesis.AmazonKinesisClient.(AmazonKinesisClient.java:137)
> at
> org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.(KinesisProxy.java:76)
> at
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.(FlinkKinesisConsumer.java:166)
> at
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.(FlinkKinesisConsumer.java:140)
> at
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.(FlinkKinesisConsumer.java:123)
> ---
>
> Any ideas what's going on?
>
> The job I'm deploying has httpclient 4.3.6 and httpcore 4.3.3 which I
> believe are the libraries with the HttpConnectionParams class.
>
> Thanks,
> Josh
>
>


Re: Optimizations not performed - please confirm

2016-06-29 Thread Fabian Hueske
Yes, that was my fault. I'm used to auto reply-all on my desktop machine,
but my phone just did a simple reply.
Sorry for the confusion,
Fabian



2016-06-29 19:24 GMT+02:00 Ovidiu-Cristian MARCU <
ovidiu-cristian.ma...@inria.fr>:

> Thank you, Aljoscha!
> I received a similar update from Fabian, only now I see the user list was
> not in CC.
>
> Fabian::*The optimizer hasn’t been touched (except for bugfixes and new
> operators) for quite some time.*
> *These limitations are still present and I don’t expect them to be removed
> anytime soon. IMO, it is more likely that certain optimizations like join
> reordering will be done for Table API / SQL queries by the Calcite
> optimizer and pushed through the Flink Dataset optimizer.*
>
> I agree, for join reordering optimisations it makes sense to rely on
> Calcite.
> My goal is to understand how current documentation correlates to the
> Flink’s framework status.
>
> I've did an experimental study where I compared Flink and Spark for many
> workloads at very large scale (I’ll share the results soon) and I would
> like to develop a few ideas on top of Flink (from the results Flink is the
> winner in most of the use cases and it is our choice for the platform on
> which to develop and grow).
>
> My interest is in understanding more about Flink today. I am familiar with
> most of the papers written, I am watching the documentation also.
> I am looking at the DataSet API, runtime and current architecture.
>
> Best,
> Ovidiu
>
> On 29 Jun 2016, at 17:27, Aljoscha Krettek  wrote:
>
> Hi,
> I think this document is still up-to-date since not much was done in these
> parts of the code for the 1.0 release and after that.
>
> Maybe Timo can give some insights into what optimizations are done in the
> Table API/SQL that will be be released in an updated version in 1.1.
>
> Cheers,
> Aljoscha
>
> +Timo, Explicitly adding Timo
>
> On Tue, 28 Jun 2016 at 21:41 Ovidiu-Cristian MARCU <
> ovidiu-cristian.ma...@inria.fr> wrote:
>
>> Hi,
>>
>> The optimizer internals described in this document [1] are probably not
>> up-to-date.
>> Can you please confirm if this is still valid:
>>
>> *“The following optimizations are not performed*
>>
>>- *Join reordering (or operator reordering in general): Joins /
>>Filters / Reducers are not re-ordered in Flink. This is a high opportunity
>>optimization, but with high risk in the absence of good estimates about 
>> the
>>data characteristics. Flink is not doing these optimizations at this 
>> point.*
>>- *Index vs. Table Scan selection: In Flink, all data sources are
>>always scanned. The data source (the input format) may apply clever
>>mechanism to not scan all the data, but pre-select and project. Examples
>>are the RCFile / ORCFile / Parquet input formats."*
>>
>> Any update of this page will be very helpful.
>>
>> Thank you.
>>
>> Best,
>> Ovidiu
>> [1] https://cwiki.apache.org/confluence/display/FLINK/Optimizer+Internals
>>
>
>


Re: [ANNOUNCE] Flink 1.1.0 Released

2016-08-09 Thread Fabian Hueske
Thanks Ufuk and everybody who contributed to the release!

Cheers, Fabian

2016-08-08 20:41 GMT+02:00 Henry Saputra :

> Great work all. Great Thanks to Ufuk as RE :)
>
> On Monday, August 8, 2016, Stephan Ewen  wrote:
>
> > Great work indeed, and big thanks, Ufuk!
> >
> > On Mon, Aug 8, 2016 at 6:55 PM, Vasiliki Kalavri <
> > vasilikikala...@gmail.com >
> > wrote:
> >
> > > yoo-hoo finally announced 🎉
> > > Thanks for managing the release Ufuk!
> > >
> > > On 8 August 2016 at 18:36, Ufuk Celebi >
> > wrote:
> > >
> > > > The Flink PMC is pleased to announce the availability of Flink 1.1.0.
> > > >
> > > > On behalf of the PMC, I would like to thank everybody who contributed
> > > > to the release.
> > > >
> > > > The release announcement:
> > > > http://flink.apache.org/news/2016/08/08/release-1.1.0.html
> > > >
> > > > Release binaries:
> > > > http://apache.openmirror.de/flink/flink-1.1.0/
> > > >
> > > > Please update your Maven dependencies to the new 1.1.0 version and
> > > > update your binaries.
> > > >
> > > > – Ufuk
> > > >
> > >
> >
>


Re: Complex batch workflow needs (too) much time to create executionPlan

2016-08-22 Thread Fabian Hueske
Hi Markus,

you might be right, that a lot of time is spend in optimization.
The optimizer enumerates all alternatives and chooses the plan with the
least estimated cost. The degrees of freedom of the optimizer are rather
restricted (execution strategies and the used partitioning & sorting keys.
The order of operators is not optimized). However, your plan contains of
more than 250 operators which is pretty large (in fact, I haven't seen a
Flink plan of this size yet).
I assume that this is only one part of the program that exceeded the 20
minutes of optimization, correct?

In order to verify that Flink is stuck in the optimization phase, you could
take a stacktrace to see which methods the Java process currently executes.

One way to improve the optimization time is to set a few JoinHints to
reduce the degrees of freedom and number of enumerated plans.

Hope this helps,
Fabian

2016-08-22 13:50 GMT+02:00 Markus Nentwig :

> Hello Flink community,
>
> I created a slightly long batch workflow for my use case of clustering
> vertices using
> Flink and Gelly. Executing each of the workflow parts individually (and
> write
> intermediate results to disk) works as suspected.
>
> When combining workflow parts to longer jobs, I noticed that the time
> between
> 'Job Name' time and the actual 'Start Time' in the Flink Dashboard differ.
> With
> longer workflow chains the time difference gets bigger and bigger.  At this
> point, I think that Flink is creating the execution plan which is executed
> directly afterwards. As an example (90% of the workflow combined), I 'wait'
> for
> the execution plan for 77-78 seconds, then the job is accepted for
> execution
> and
> needs another 7-9 seconds to process a small test dataset (<8k vertices
> with
> property values and edges) - each run repeated 3 times. If running only
> env.getExecutionPlan() I will wait similar time for the execution plan. I
> added the
> JSON execution plan to this post. For bigger datasets the execution plan
> creation time and the job execution time grows as well in my scenario.
>
> When I now add a vertex centric iteration to my workflow and start the
> Flink
> job, I don't get a result at all: I stopped the job
> (print execution plan to log) at the following point:
>
> - waited > 20 hours after 'flink run ...'
> - two cores on my machine are at 100% all the time working on the flink job
> - no entry in Flink dashboard at all
> - no entry in log file after these lines:
>
> ###
> [...]
> org.apache.flink.client.CliFrontend- Starting execution of
> program
> org.apache.flink.client.program.Client - Starting program in
> interactive mode
> org.apache.flink.api.java.ExecutionEnvironment - The job has 2 registered
> types and 0 default Kryo serializers
> org.apache.flink.optimizer.Optimizer   - The parallelism of nested
> dataflows (such as step functions in iterations) is currently fixed to the
> parallelism of the surrounding operator (the iteration).
> ###
>
> Most likely the workflow could be optimized in many ways to need less time
> at
> certain points (yes, I am not a Flink expert in many places), but I think
> that
> long/complex workflows would still suffer of problems like this.
> Due to the fact that every single step is producing output (and some
> combined
> parts of the workflow do so, too), I currently suspect the Flink optimizer
> /
> execution plan creation to be the problem and therefore ask anyone here if
> you
> have experience with similar behavior. Any suggestions how I could
> successfully
> run long/complex workflows not running in such problems? ;)
>
> If there is not (an instant) 'solution' to the problem I would be still
> interested in opinions and ideas, thanks in advance!
>
> Best,
> Markus
>
> complexExecPlan.json
>  n4.nabble.com/file/n8596/complexExecPlan.json>
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Complex-batch-
> workflow-needs-too-much-time-to-create-executionPlan-tp8596.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: Batch jobs with a very large number of input splits

2016-08-23 Thread Fabian Hueske
Hi Niels,

yes, in YARN mode, the default parallelism is the number of available slots.

You can change the default task parallelism like this:

1) Use the -p parameter when submitting a job via the CLI client [1]
2) Set a parallelism on the execution environment: env.setParallelism()

Best, Fabian

[1] https://ci.apache.org/projects/flink/flink-docs-master/apis/cli.html
[2]
https://ci.apache.org/projects/flink/flink-docs-master/apis/common/index.html#execution-environment-level


2016-08-23 10:29 GMT+02:00 Niels Basjes :

> I did more digging and finally understand what goes wrong.
> I create a yarn-session with 50 slots.
> Then I run my job that (due to the fact that my HBase table has 100s of
> regions) has a lot of inputsplits.
> The job then runs with parallelism 50 because I did not specify the value.
> As a consequence the second job I start in the same yarn-session is faced
> with 0 available task slots and fails with this exception:
>
> 08/23/2016 09:58:52 Job execution switched to status FAILING.
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Not enough free slots available to run the job. You can decrease the
> operator parallelism or increase the number of slots per TaskManager in the
> configuration. Task to schedule: .. Resources available to scheduler:
> Number of instances=5, total number of slots=50, available slots=0
>
> So my conclusion for now is that if you want to run batch jobs in
> yarn-session then you MUST specify the parallelism for all steps or
> otherwise it will fill the yarn-session completely and you cannot run
> multiple jobs in parallel.
>
> Is this conclusion correct?
>
> Niels Basjes
>
>
> On Fri, Aug 19, 2016 at 3:18 PM, Robert Metzger 
> wrote:
>
>> Hi Niels,
>>
>> In Flink, you don't need one task per file, since splits are assigned
>> lazily to reading tasks.
>> What exactly is the error you are getting when trying to read that many
>> input splits? (Is it on the JobManager?)
>>
>> Regards,
>> Robert
>>
>> On Thu, Aug 18, 2016 at 1:56 PM, Niels Basjes  wrote:
>>
>>> Hi,
>>>
>>> I'm working on a batch process using Flink and I ran into an interesting
>>> problem.
>>> The number of input splits in my job is really really large.
>>>
>>> I currently have a HBase input (with more than 1000 regions) and in the
>>> past I have worked with MapReduce jobs doing 2000+ files.
>>>
>>> The problem I have is that if I run such a job in a "small" yarn-session
>>> (i.e. less than 1000 tasks) I get a fatal error indicating that there are
>>> not enough resources.
>>> For a continuous streaming job this makes sense, yet for a batch job
>>> (like I'm having) this is an undesirable error.
>>>
>>> For my HBase situation I currently have a workaround by overriding the
>>> creatInputSplits method from the TableInputFormat and thus control the
>>> input splits that are created.
>>>
>>> What is the correct way to solve this (no my cluster is NOT big enough
>>> to run that many parallel tasks) ?
>>>
>>>
>>> --
>>> Best regards / Met vriendelijke groeten,
>>>
>>> Niels Basjes
>>>
>>
>>
>
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>


Re: Joda exclude in java quickstart maven archetype

2016-08-29 Thread Fabian Hueske
Hi Flavio,

yes, Joda should not be excluded.
This will be fixed in Flink 1.1.2.

Cheers, Fabian


2016-08-29 11:00 GMT+02:00 Flavio Pompermaier :

> Hi to all,
> I've tried to  upgrade from Flink 1.0.2 to 1.1.1 so I've copied the
> excludes of the maven shade plugin from the java quickstart pom but it
> includes the exclude of joda (that is not contained in the flink-dist
> jar).  That causes my job to fail.
> Shouldn't it be removed from the exclude list?
>
> Best,
> Flavio
>


Re: CountTrigger FIRE or FIRE_AND_PURGE

2016-08-29 Thread Fabian Hueske
Hi Paul,

This blog post [1] includes an example of an early trigger that should
pretty much do what you are looking for.
This one [2] explains the windowing mechanics of Flink (window assigner,
trigger, function, etc).

Hope this helps,
Fabian

[1]
https://www.mapr.com/blog/essential-guide-streaming-first-processing-apache-flink
[2] http://flink.apache.org/news/2015/12/04/Introducing-windows.html

2016-08-30 0:25 GMT+02:00 Paul Joireman :

> Hi all,
>
>
> I'm attempting to use long SlidingEventTime window (duration 24 hours) but
> I would like updates more frequently than the 24 hour length.  I
> naeively attempted to use a simple CountTrigger(10) to give me the window
> every time 10 samples are collected, however, the window processing
> function I'm using only seems to get the latest 10 not the whole window
> (which I what I was hoping for).   The code looks like it simply fires
> after the count is reached but it seems like it is doing a FIRE and PURGE,
> I cant' seem to use the iterator in the window processing function to get
> more than 10 elements at a time.  Is there something I'm missing in order
> to get at the full content of the window data.
>
>
> Paul
>


Re: NoClassDefFoundError with ElasticsearchSink on Yarn

2016-09-01 Thread Fabian Hueske
Hi Steffen,

this looks like a Guava version mismatch to me.
Are you running exactly the same program on your local machine or did you
add dependencies to run it on the cluster (e.g. Kinesis).
Maybe Kinesis and Elasticsearch are using different Guava versions?

Best, Fabian

2016-09-01 10:45 GMT+02:00 Hausmann, Steffen :

> Hi there,
>
> I’m running a flink program that reads from a Kinesis stream and
> eventually writes to an Elasticsearch2 sink. When I’m running the program
> locally from the IDE, everything seems to work fine, but when I’m executing
> the same program on an EMR cluster with Yarn, a NoClassDefFoundError occurs:
>
> java.lang.NoSuchMethodError: com.google.common.util.concurr
> ent.MoreExecutors.directExecutor()Ljava/util/concurrent/Executor;
>
>at org.elasticsearch.threadpool.ThreadPool.(ThreadPool.
> java:190)
>
>at org.elasticsearch.client.transport.TransportClient$Builder.
> build(TransportClient.java:133)
>
>at org.apache.flink.streaming.connectors.elasticsearch2.Elastic
> searchSink.open(ElasticsearchSink.java:164)
>
>at org.apache.flink.api.common.functions.util.FunctionUtils.ope
> nFunction(FunctionUtils.java:38)
>
>at org.apache.flink.streaming.api.operators.AbstractUdfStreamOp
> erator.open(AbstractUdfStreamOperator.java:91)
>
>at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllO
> perators(StreamTask.java:376)
>
>at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:256)
>
>at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>
>at java.lang.Thread.run(Thread.java:745)
>
> I’ve installed flink on an EMR cluster from the binary distribution
> flink-1.1.1-bin-hadoop27-scala_2.10.tgz. I build the jar file locally
> with mvn clean package(I’ve attached the pom.xml for reference) and copy it
> to the cluster for execution. There is a thread on this list that seems to
> be related, but I’m afraid I couldn’t draw any conclusions from it:
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.
> nabble.com/classpath-issue-on-yarn-tt6442.html#none
>
>
> Any idea, what’s wrong?
>
> Thanks,
>
> Steffen
>
>


Re: CountTrigger FIRE or FIRE_AND_PURGE

2016-09-01 Thread Fabian Hueske
Hi Paul,

sorry for the delayed reply.

I think a CountTrigger won't give you the expected result. When you call
trigger() you replace! the existing trigger. In case of a
Sliding/TumblingEventTimeWindow, the trigger that fires at the end of the
window is replaced by a trigger that fires every 10 element. So your window
function will not be called after 24h.
You need to implement a custom trigger, similar to the one in the blog
post. I think you only need to modify the example code such that it does
not sum an attribute (passengerCount) but rather counts how often
onElement() has been invoked.

Regarding the CountTrigger that fires several times per element on the
Sliding and only once on Tumbling windows. In case of a tumbling window,
each record is inserted into exactly one window. In case of sliding
windows, each element is inserted into multiple windows (should be six for
a SlidingWindow(1 minute, 10 seconds)). The CountTrigger fires each window
individually.

When using a time window, the WindowFunction has two parameters that
identify the window: 1) the key and 2) the Window object. In case of a time
window, the Window object is a TimeWindow that provides start and end
timestamps.

Another point to consider (esp. for long-running windows as in your case)
is incremental aggregation of window elements [1]. By providing a
FoldFunction (or ReduceFunction), the function is applied for each arriving
element to eagerly aggregate the elements. This means, that the window only
holds the aggregated value and not each individual window element. Hence,
the memory / storage food print is much better and the window aggregate
does not have to be computed for each early firing. When an element arrives
1) the element is aggregated with the incremental aggregation function 2)
the trigger is evaluated 3) if the trigger fires, the window function is
called.

Hope this helps,
Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/windows.html#windowfunction-with-incremental-aggregation


2016-08-30 17:59 GMT+02:00 Paul Joireman :

> Fabian,
>
>
> Thanks for the reference, I think I was incorrectly interpreting the
> results I was getting using the CountTrigger, it looks like it does keep
> the data.   However, I'm running into some unexpected (at least by me)
> behavior.   Given a keyed data stream keyedStream and event timing
>
>
> final DataStream alertingMsgs = keyedStream
> .window(SlidingEventTimeWindows.of(Time.minutes(1),
> Time.seconds(10)))
> .trigger(CountTrigger.of(1))
> .apply(new MyWindowProcessor());
>
> Every time a new element comes in I expected (probably naeively) one
> firing of the window but I get 5, presumably due to the sliding windows,
> although this probably depends on the Timestamp extraction "policy", I used
> a BoundedOutOfOrdernessTimestampExtractor(Time.minute(1)).Is there a
> way in the window processing function to determine which particular sliding
> window you are processing?
>
> Alternatively, a TumblingEventTimeWindow as below only fires once, but
> with the default trigger replaced by CountTrigger, my understanding is that
> the previous windows will not purge, is that correct?
>
> final DataStream alertingMsgs = keyedStream
> .window(TumblingEventTimeWindows.of(Time.minutes(1)))
> .trigger(CountTrigger.of(1))
>     .apply(new MyWindowProcessor());
>
> Paul
>
> --
> *From:* Fabian Hueske 
> *Sent:* Monday, August 29, 2016 5:46:06 PM
> *To:* user@flink.apache.org
> *Subject:* Re: CountTrigger FIRE or FIRE_AND_PURGE
>
> Hi Paul,
>
> This blog post [1] includes an example of an early trigger that should
> pretty much do what you are looking for.
> This one [2] explains the windowing mechanics of Flink (window assigner,
> trigger, function, etc).
>
> Hope this helps,
> Fabian
>
> [1] https://www.mapr.com/blog/essential-guide-streaming-
> first-processing-apache-flink
> [2] http://flink.apache.org/news/2015/12/04/Introducing-windows.html
>
> 2016-08-30 0:25 GMT+02:00 Paul Joireman :
>
>> Hi all,
>>
>>
>> I'm attempting to use long SlidingEventTime window (duration 24 hours)
>> but I would like updates more frequently than the 24 hour length.  I
>> naeively attempted to use a simple CountTrigger(10) to give me the window
>> every time 10 samples are collected, however, the window processing
>> function I'm using only seems to get the latest 10 not the whole window
>> (which I what I was hoping for).   The code looks like it simply fires
>> after the count is reached but it seems like it is doing a FIRE and PURGE,
>> I cant' seem to use the iterator in the window processing function to get
>> more than 10 elements at a time.  Is there something I'm missing in order
>> to get at the full content of the window data.
>>
>>
>> Paul
>>
>
>


Re: Streaming - memory management

2016-09-01 Thread Fabian Hueske
Hi Vinay,

can you give a bit more detail about how you plan to implement the outer
join? Using a WIndowFunction or a CoFlatMapFunction on a KeyedStream?

An alternative could be to use a CoGroup operator which collects from two
inputs all elements that share a common key (the join key) and are in the
same window. The interface of the function provides two iterators over the
elements of both inputs and can be used to implement outer join
functionality. The benefit of working with a CoGroupFunction is that you do
not have to take care of state handling at all.

In case you go for a custom implementation you will need to work with
operator state.
However, you do not need to directly interact with RocksDB. Flink is taking
care of that for you.

Best, Fabian

2016-09-01 16:13 GMT+02:00 vinay patil :

> Hi Fabian/Stephan,
>
> Waiting for your suggestion
>
> Regards,
> Vinay Patil
>
> On Wed, Aug 31, 2016 at 1:46 PM, Vinay Patil <[hidden email]
> <http:///user/SendEmail.jtp?type=node&node=8829&i=0>> wrote:
>
>> Hi Fabian/Stephan,
>>
>> This makes things clear.
>>
>> This is the use case I have :
>> I am performing a outer join operation on the two streams (in window)
>> after which I get matchingAndNonMatchingStream, now I want to make sure
>> that the matching rate is high (matching cannot happen if one of the source
>> is not emitting elements for certain time) , so to tackle this situation I
>> was thinking of using RocksDB as a state Backend, where I will insert the
>> unmatched records in it (key - will be same as used for window and value
>> will be DTO ), so before inserting into it I will check if it is already
>> present in RocksDB, if yes I will take the data from it and send it
>> downstream (and ensure I perform the clean operation for that key).
>> (Also the data to store should be encrypted, encryption part can be
>> handled )
>>
>> so instead of using Cassandra , Can I do this using RocksDB as state
>> backend since the state is not gone after checkpointing ?
>>
>> P.S I have kept the watermark behind by 1500 secs just to be safe on
>> handling late elements but to tackle edge case scenarios like the one
>> mentioned above we are having a backup plan of using Cassandra as external
>> store since we are dealing with financial critical data.
>>
>> Regards,
>> Vinay Patil
>>
>> On Wed, Aug 31, 2016 at 11:34 AM, Fabian Hueske <[hidden email]
>> <http:///user/SendEmail.jtp?type=node&node=8829&i=1>> wrote:
>>
>>> Hi Vinaj,
>>>
>>> if you use user-defined state, you have to manually clear it.
>>> Otherwise, it will stay in the state backend (heap or RocksDB) until the
>>> job goes down (planned or due to an OOM error).
>>>
>>> This is esp. important to keep in mind, when using keyed state.
>>> If you have an unbounded, evolving key space you will likely run
>>> out-of-memory.
>>> The job will constantly add state for each new key but won't be able to
>>> clean up the state for "expired" keys.
>>>
>>> You could implement a clean-up mechanism this if you implement a custom
>>> stream operator.
>>> However this is a very low level interface and requires solid
>>> understanding
>>> of the internals like timestamps, watermarks and the checkpointing
>>> mechanism.
>>>
>>> The community is currently working on a state expiry feature (state will
>>> be
>>> discarded if not requested or updated for x minutes).
>>>
>>> Regarding the second question: Does state remain local after
>>> checkpointing?
>>> Yes, the local state is only copied to the remote FS (HDFS, S3, ...) but
>>> remains in the operator. So the state is not gone after a checkpoint is
>>> completed.
>>>
>>> Hope this helps,
>>> Fabian
>>>
>>> 2016-08-31 18:17 GMT+02:00 Vinay Patil <[hidden email]
>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=2>>:
>>>
>>> > Hi Stephan,
>>> >
>>> > Just wanted to jump into this discussion regarding state.
>>> >
>>> > So do you mean that if we maintain user-defined state (for non-window
>>> > operators), then if we do  not clear it explicitly will the data for
>>> that
>>> > key remains in RocksDB.
>>> >
>>> > What happens in case of checkpoint ? I read in the documentation that
>>> after
>>> > the checkpoint happens the rocksDB data is pushed to the desired
>>> lo

Re: Streaming - memory management

2016-09-01 Thread Fabian Hueske
Thanks for the explanation. I think I understood your usecase.

Yes, I'd go for the RocksDB approach in a CoFlatMapFunction on a keyed
stream (keyed by join key).
One input would be the unmatched outer join records, the other input would
serve the events you want to match them with.
Retrieving elements from RocksDB will be local and should be fast.

You should be confident though, that all unmatched record will be picked up
at some point (RocksDB persists to disk, so you won't run out of memory but
snapshots size will increase).
The future state expiry feature will avoid such situations.

Best, Fabian

2016-09-01 18:29 GMT+02:00 vinay patil :

> Hi Fabian,
>
> I had already used Co-Group function earlier but were getting some issues
> while dealing with watermarks (for one use case I was not getting the
> correct result), so I have used the union operator for performing the
> outer-join (WindowFunction on a keyedStream), this approach is working
> correctly and giving me correct results.
>
> As I have discussed the scenario, I want to maintain the non-matching
> records in some store, so that's why I was thinking of using RocksDB as a
> store here, where I will maintain the user-defined state  after the
> outer-join window operator, and I can query it using Flink to check if the
> value for a particular key is present or not , if present I can match them
> and send it downstream.
>
> The final goal is to have zero non-matching records, so this is the backup
> plan to handle edge-case scenarios.
>
> I have already integrated code to write to Cassandra using Flink
> Connector, but I think this will be a better option rather than hitting the
> query to external store since RocksDb will store the data to local TM disk,
> the retrieval will be faster here than Cassandra , right ?
>
> What do you think ?
>
>
> Regards,
> Vinay Patil
>
> On Thu, Sep 1, 2016 at 10:19 AM, Fabian Hueske-2 [via Apache Flink User
> Mailing List archive.] <[hidden email]
> <http:///user/SendEmail.jtp?type=node&node=8836&i=0>> wrote:
>
>> Hi Vinay,
>>
>> can you give a bit more detail about how you plan to implement the outer
>> join? Using a WIndowFunction or a CoFlatMapFunction on a KeyedStream?
>>
>> An alternative could be to use a CoGroup operator which collects from two
>> inputs all elements that share a common key (the join key) and are in the
>> same window. The interface of the function provides two iterators over the
>> elements of both inputs and can be used to implement outer join
>> functionality. The benefit of working with a CoGroupFunction is that you do
>> not have to take care of state handling at all.
>>
>> In case you go for a custom implementation you will need to work with
>> operator state.
>> However, you do not need to directly interact with RocksDB. Flink is
>> taking care of that for you.
>>
>> Best, Fabian
>>
>> 2016-09-01 16:13 GMT+02:00 vinay patil <[hidden email]
>> <http:///user/SendEmail.jtp?type=node&node=8832&i=0>>:
>>
>>> Hi Fabian/Stephan,
>>>
>>> Waiting for your suggestion
>>>
>>> Regards,
>>> Vinay Patil
>>>
>>> On Wed, Aug 31, 2016 at 1:46 PM, Vinay Patil <[hidden email]
>>> <http:///user/SendEmail.jtp?type=node&node=8829&i=0>> wrote:
>>>
>>>> Hi Fabian/Stephan,
>>>>
>>>> This makes things clear.
>>>>
>>>> This is the use case I have :
>>>> I am performing a outer join operation on the two streams (in window)
>>>> after which I get matchingAndNonMatchingStream, now I want to make sure
>>>> that the matching rate is high (matching cannot happen if one of the source
>>>> is not emitting elements for certain time) , so to tackle this situation I
>>>> was thinking of using RocksDB as a state Backend, where I will insert the
>>>> unmatched records in it (key - will be same as used for window and value
>>>> will be DTO ), so before inserting into it I will check if it is already
>>>> present in RocksDB, if yes I will take the data from it and send it
>>>> downstream (and ensure I perform the clean operation for that key).
>>>> (Also the data to store should be encrypted, encryption part can be
>>>> handled )
>>>>
>>>> so instead of using Cassandra , Can I do this using RocksDB as state
>>>> backend since the state is not gone after checkpointing ?
>>>>
>>>> P.S I have kept the watermark behind by 1500 secs just to be safe on
>>>> handling late elem

Re: Streaming - memory management

2016-09-01 Thread Fabian Hueske
I thought you would like to join the non-matched elements with another
(third) stream.

--> s1.union(s2).keyBy().window().apply(//
outerjoin).keyBy.connect(s3.keyBy).coFlatMap(// backup join)

If you want to match the non-matched stream with itself a FlatMapFunction
is the right choice.

--> s1.union(s2).keyBy().window().apply(// outerjoin).keyBy.flatMap(//
backup join)

The backup join puts all non-match elements in the state and waits for
another non-matched element with the same key to do the join.

Best, Fabian



2016-09-01 19:55 GMT+02:00 vinay patil :

> Yes, that's what I am looking for.
>
> But why to use CoFlatMapFunction , I have already got the
> matchingAndNonMatching Stream , by doing the union of two streams and
> having the logic in apply method for performing outer-join.
>
> I am thinking of applying the same key on matchingAndNonMatching and
> flatmap to take care of rest logic.
>
> Or are you suggestion to use Co-FlatMapFunction after the outer-join
> operation  (I mean after doing the window and
> getting matchingAndNonMatching stream )?
>
> Regards,
> Vinay Patil
>
> On Thu, Sep 1, 2016 at 11:38 AM, Fabian Hueske-2 [via Apache Flink User
> Mailing List archive.] <[hidden email]
> <http:///user/SendEmail.jtp?type=node&node=8842&i=0>> wrote:
>
>> Thanks for the explanation. I think I understood your usecase.
>>
>> Yes, I'd go for the RocksDB approach in a CoFlatMapFunction on a keyed
>> stream (keyed by join key).
>> One input would be the unmatched outer join records, the other input
>> would serve the events you want to match them with.
>> Retrieving elements from RocksDB will be local and should be fast.
>>
>> You should be confident though, that all unmatched record will be picked
>> up at some point (RocksDB persists to disk, so you won't run out of memory
>> but snapshots size will increase).
>> The future state expiry feature will avoid such situations.
>>
>> Best, Fabian
>>
>> 2016-09-01 18:29 GMT+02:00 vinay patil <[hidden email]
>> <http:///user/SendEmail.jtp?type=node&node=8837&i=0>>:
>>
>>> Hi Fabian,
>>>
>>> I had already used Co-Group function earlier but were getting some
>>> issues while dealing with watermarks (for one use case I was not getting
>>> the correct result), so I have used the union operator for performing the
>>> outer-join (WindowFunction on a keyedStream), this approach is working
>>> correctly and giving me correct results.
>>>
>>> As I have discussed the scenario, I want to maintain the non-matching
>>> records in some store, so that's why I was thinking of using RocksDB as a
>>> store here, where I will maintain the user-defined state  after the
>>> outer-join window operator, and I can query it using Flink to check if the
>>> value for a particular key is present or not , if present I can match them
>>> and send it downstream.
>>>
>>> The final goal is to have zero non-matching records, so this is the
>>> backup plan to handle edge-case scenarios.
>>>
>>> I have already integrated code to write to Cassandra using Flink
>>> Connector, but I think this will be a better option rather than hitting the
>>> query to external store since RocksDb will store the data to local TM disk,
>>> the retrieval will be faster here than Cassandra , right ?
>>>
>>> What do you think ?
>>>
>>>
>>> Regards,
>>> Vinay Patil
>>>
>>> On Thu, Sep 1, 2016 at 10:19 AM, Fabian Hueske-2 [via Apache Flink User
>>> Mailing List archive.] <[hidden email]
>>> <http:///user/SendEmail.jtp?type=node&node=8836&i=0>> wrote:
>>>
>>>> Hi Vinay,
>>>>
>>>> can you give a bit more detail about how you plan to implement the
>>>> outer join? Using a WIndowFunction or a CoFlatMapFunction on a KeyedStream?
>>>>
>>>> An alternative could be to use a CoGroup operator which collects from
>>>> two inputs all elements that share a common key (the join key) and are in
>>>> the same window. The interface of the function provides two iterators over
>>>> the elements of both inputs and can be used to implement outer join
>>>> functionality. The benefit of working with a CoGroupFunction is that you do
>>>> not have to take care of state handling at all.
>>>>
>>>> In case you go for a custom implementation you will need to work with
>>>> operator state.
>>>> However, you do not need to direct

Re: Windows and Watermarks Clarification

2016-09-01 Thread Fabian Hueske
Hi Paul,

BoundedOutOfOrdernessTimestampExtractor implements the
AssignerWithPeriodicWatermarks interface.
This means, Flink will ask the assigner in regular intervals (configurable
via StreamExecutionEnvironment.getConfig().setAutoWatermarkInterval()) for
the current watermark.
The watermark will be 10secs earlier than the highest observed timestamp so
far.

An event-time window is evaluated when the current watermark is higher /
later than the window's end time. With allowedLateness() the window
evaluation can be deferred to allow late elements (elements whose timestamp
is before the current watermark) to join the window before it is evaluated.

Let me know if you have further questions,
Fabian


2016-09-01 20:16 GMT+02:00 Paul Joireman :

> Hi all,
>
>
> Just a point of clarification on how watermarks are generated.   I'd like
> to use a SlidingEventTime window of say 5 minutes with a 30 second slide.
> The incoming data stream has elements from which I can extract the
> timestamp but they may come out of order so I chose to implement the
> following timestamp assigner.
>
>
>  my_stream.assignTimestampsAndWatermarks(
>   new BoundedOutOfOrdernessTimestampExtractor(Time.seconds(10))
> {
>   @Override
>   public long extractTimestamp(final MyElement element) {
>   return element.getTimestamp();
>   }
>   });
>
> With this definition and the code for BoundedOutOfOrdernessTimestampExtractor,
> my understanding is that for each incoming element a watermark will be
> generated that is 10 seconds behind the current timestamp.If any the
> end time of any of the sliding windows is earlier that an emitted watermark
> that (or those) windows will fire initiating a processing on the window(s).
>   Is this correct?
>
> Paul
>
>


Re: Streaming - memory management

2016-09-01 Thread Fabian Hueske
You do not have to convert your DTO into a JSON object to use it as a
key-value state in a Flink function.
You can pass it as it is via the state interfaces.

Can you point me to the documentation that you find confusing? The state
documentation [1] says:

>> You can make *every* transformation (map, filter, etc) stateful by using
Flink’s state interface or checkpointing instance fields of your function.
>> You can register any instance field as *managed* state by implementing
an interface.
>> In this case, and also in the case of using Flink’s native state
interface, Flink will automatically take consistent snapshots of your state
periodically, and restore its value in the case of a failure.

Is that unclear/confusing or are you referring to different paragraph?

Thanks, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/state.html

2016-09-01 20:22 GMT+02:00 vinay patil :

> I don't to join the third stream.
>
> And Yes, This is what I was thinking of.also :
> s1.union(s2).keyBy().window().apply(// outerjoin).keyBy.flatMap(// backup
> join)
>
>
> I am already done integrating with Cassandra but I feel RocksDB will be a
> better option, I will have to take care of the clearing part as you have
> suggested, will check that in documentation.
>
> I have the DTO with almost 50 fields , converting it to JSON and storing
> it as a state should not be a problem , or there is no harm in storing the
> DTO ?
>
> I think the documentation should specify the point that the state will be
> maintained for user-defined operators to avoid confusion.
>
> Regards,
> Vinay Patil
>
> On Thu, Sep 1, 2016 at 1:12 PM, Fabian Hueske-2 [via Apache Flink User
> Mailing List archive.] <[hidden email]
> <http:///user/SendEmail.jtp?type=node&node=8845&i=0>> wrote:
>
>> I thought you would like to join the non-matched elements with another
>> (third) stream.
>>
>> --> s1.union(s2).keyBy().window().apply(// 
>> outerjoin).keyBy.connect(s3.keyBy).coFlatMap(//
>> backup join)
>>
>> If you want to match the non-matched stream with itself a FlatMapFunction
>> is the right choice.
>>
>> --> s1.union(s2).keyBy().window().apply(// outerjoin).keyBy.flatMap(//
>> backup join)
>>
>> The backup join puts all non-match elements in the state and waits for
>> another non-matched element with the same key to do the join.
>>
>> Best, Fabian
>>
>>
>>
>> 2016-09-01 19:55 GMT+02:00 vinay patil <[hidden email]
>> <http:///user/SendEmail.jtp?type=node&node=8843&i=0>>:
>>
>>> Yes, that's what I am looking for.
>>>
>>> But why to use CoFlatMapFunction , I have already got the
>>> matchingAndNonMatching Stream , by doing the union of two streams and
>>> having the logic in apply method for performing outer-join.
>>>
>>> I am thinking of applying the same key on matchingAndNonMatching and
>>> flatmap to take care of rest logic.
>>>
>>> Or are you suggestion to use Co-FlatMapFunction after the outer-join
>>> operation  (I mean after doing the window and
>>> getting matchingAndNonMatching stream )?
>>>
>>> Regards,
>>> Vinay Patil
>>>
>>> On Thu, Sep 1, 2016 at 11:38 AM, Fabian Hueske-2 [via Apache Flink User
>>> Mailing List archive.] <[hidden email]
>>> <http:///user/SendEmail.jtp?type=node&node=8842&i=0>> wrote:
>>>
>>>> Thanks for the explanation. I think I understood your usecase.
>>>>
>>>> Yes, I'd go for the RocksDB approach in a CoFlatMapFunction on a keyed
>>>> stream (keyed by join key).
>>>> One input would be the unmatched outer join records, the other input
>>>> would serve the events you want to match them with.
>>>> Retrieving elements from RocksDB will be local and should be fast.
>>>>
>>>> You should be confident though, that all unmatched record will be
>>>> picked up at some point (RocksDB persists to disk, so you won't run out of
>>>> memory but snapshots size will increase).
>>>> The future state expiry feature will avoid such situations.
>>>>
>>>> Best, Fabian
>>>>
>>>> 2016-09-01 18:29 GMT+02:00 vinay patil <[hidden email]
>>>> <http:///user/SendEmail.jtp?type=node&node=8837&i=0>>:
>>>>
>>>>> Hi Fabian,
>>>>>
>>>>> I had already used Co-Group function earlier but were getting some
>>>>> issues while dealing with watermarks (f

Re: Windows and Watermarks Clarification

2016-09-01 Thread Fabian Hueske
A 10 minute tumbling window that starts at 12:00 is evaluated after a
watermark is observed that is > 12:10.
If the same tumbling window has an allowed lateness of 5 minuted, it is
evaluated once a watermark > 12:15 is observed. However, only elements with
timestamps 12:00 <= x < 12:10 are in the window.
Elements that arrive even after the allowed lateness period are simply
dropped.

Best, Fabian

2016-09-01 20:42 GMT+02:00 Paul Joireman :

> Thanks Fabian,
>
>
> This is making more sense.  Is allowedLateness(Time.seconds(x)) then
> evaluated relative to maxEventTime - lastWaterMarkTime.   So if (maxEventTime
> - lastWaterMarkTime) > x * 1000 then the window is evaluated?
>
>
> Paul
> ------
> *From:* Fabian Hueske 
> *Sent:* Thursday, September 1, 2016 1:25:55 PM
> *To:* user@flink.apache.org
> *Subject:* Re: Windows and Watermarks Clarification
>
> Hi Paul,
>
> BoundedOutOfOrdernessTimestampExtractor implements the
> AssignerWithPeriodicWatermarks interface.
> This means, Flink will ask the assigner in regular intervals (configurable
> via StreamExecutionEnvironment.getConfig().setAutoWatermarkInterval())
> for the current watermark.
> The watermark will be 10secs earlier than the highest observed timestamp
> so far.
>
> An event-time window is evaluated when the current watermark is higher /
> later than the window's end time. With allowedLateness() the window
> evaluation can be deferred to allow late elements (elements whose timestamp
> is before the current watermark) to join the window before it is evaluated.
>
> Let me know if you have further questions,
> Fabian
>
>
> 2016-09-01 20:16 GMT+02:00 Paul Joireman :
>
>> Hi all,
>>
>>
>> Just a point of clarification on how watermarks are generated.   I'd like
>> to use a SlidingEventTime window of say 5 minutes with a 30 second slide.
>> The incoming data stream has elements from which I can extract the
>> timestamp but they may come out of order so I chose to implement the
>> following timestamp assigner.
>>
>>
>>  my_stream.assignTimestampsAndWatermarks(
>>   new 
>> BoundedOutOfOrdernessTimestampExtractor(Time.seconds(10))
>> {
>>   @Override
>>   public long extractTimestamp(final MyElement element) {
>>   return element.getTimestamp();
>>   }
>>   });
>>
>> With this definition and the code for 
>> BoundedOutOfOrdernessTimestampExtractor,
>> my understanding is that for each incoming element a watermark will be
>> generated that is 10 seconds behind the current timestamp.If any the
>> end time of any of the sliding windows is earlier that an emitted watermark
>> that (or those) windows will fire initiating a processing on the window(s).
>>   Is this correct?
>>
>> Paul
>>
>>
>


Re: [SUGGESTION] Stack Overflow Documentation for Apache Flink

2016-09-05 Thread Fabian Hueske
Thanks for the suggestion Vishnu!
Stackoverflow documentation looks great. I like the easy contribution and
versioning features.

However, I am a bit skeptical. IMO, Flink's primary documentation must be
hosted by Apache. Out-sourcing such an important aspect of a project to an
external service is not an option for me.
This would mean, that documentation on SO would be an additional /
secondary documentation. I see two potential problems with that:

- It is duplicate effort to keep two documentations up-to-date. Adding a
new feature of changing some behavior must be documented in two places.
- Efforts to improve documentation might split up, i.e., the primary
documentation might receive less improvements and contributions.

Of course, this is just my opinion but I think it is worth to mention these
points.

Thanks,
Fabian

2016-09-05 12:22 GMT+02:00 Ravikumar Hawaldar 
:

> Hi,
>
>
> I just committed to apache-flink documentation on SO, one more commit
> required. Nice idea to document on SO Vishnu.
>
>
>
> Regards,
>
> Ravikumar
>
> On 5 September 2016 at 14:22, Maximilian Michels  wrote:
>
>> Hi!
>>
>> This looks neat. Let's try it out. I just voted.
>>
>> Cheers,
>> Max
>>
>> On Sun, Sep 4, 2016 at 8:11 PM, Vishnu Viswanath
>>  wrote:
>> > Hi All,
>> >
>> > Why don't we make use of Stackoverflow's new documentation feature to do
>> > some documentation of Apache Flink.
>> >
>> > To start, at least 5 SO users should commit to document, who has at
>> least150
>> > reputation and have at least 1 positively scored answer in Flink tag.
>> >
>> > http://stackoverflow.com/documentation/apache-flink
>> >
>> > Regards,
>> > Vishnu Viswanath
>>
>
>


Re: Sharing Java Collections within Flink Cluster

2016-09-07 Thread Fabian Hueske
Hi,

Flink does not provide shared state.
However, you can broadcast a stream to CoFlatMapFunction, such that each
operator has its own local copy of the state.

If that does not work for you because the state is too large and if it is
possible to partition the state (and both streams), you can also use keyBy
instead of broadcast.

Finally, you can use an external system like a KeyValue Store or In-Memory
store like Apache Ignite to hold your distributed collection.

Best, Fabian

2016-09-07 17:49 GMT+02:00 Chakravarthy varaga :

> Hi Team,
>
>  Can someone help me here? Appreciate any response !
>
> Best Regards
> Varaga
>
> On Mon, Sep 5, 2016 at 4:51 PM, Chakravarthy varaga <
> chakravarth...@gmail.com> wrote:
>
>> Hi Team,
>>
>> I'm working on a Flink Streaming application. The data is injected
>> through Kafka connectors. The payload volume is roughly 100K/sec. The event
>> payload is a string. Let's call this as DataStream1.
>> This application also uses another DataStream, call it DataStream2,
>> (consumes events off a kafka topic). The elements of this DataStream2
>> involves in a certain transformation that finally updates a Hashmap(/Java
>> util Collection). Apparently the flink application should share this
>> HashMap across the flink cluster so that DataStream1 application could
>> check the state of the values in this collection. Is there a way to do this
>> in Flink?
>>
>> I don't see any Shared Collection used within the cluster?
>>
>> Best Regards
>> CVP
>>
>
>


Re: Sharing Java Collections within Flink Cluster

2016-09-07 Thread Fabian Hueske
Is writing DataStream2 to a Kafka topic and reading it from the other job
an option?

2016-09-07 19:07 GMT+02:00 Chakravarthy varaga :

> Hi Fabian,
>
> Thanks for your response. Apparently these DataStream
> (Job1-DataStream1 & Job2-DataStream2) are from different flink applications
> running within the same cluster.
> DataStream2 (from Job2) applies transformations and updates a 'cache'
> on which (Job1) needs to work on.
> Our intention is to not use the external key/value store as we are
> trying to localize the cache within the cluster.
> Is there a way?
>
> Best Regards
> CVP
>
> On Wed, Sep 7, 2016 at 5:00 PM, Fabian Hueske  wrote:
>
>> Hi,
>>
>> Flink does not provide shared state.
>> However, you can broadcast a stream to CoFlatMapFunction, such that each
>> operator has its own local copy of the state.
>>
>> If that does not work for you because the state is too large and if it is
>> possible to partition the state (and both streams), you can also use keyBy
>> instead of broadcast.
>>
>> Finally, you can use an external system like a KeyValue Store or
>> In-Memory store like Apache Ignite to hold your distributed collection.
>>
>> Best, Fabian
>>
>> 2016-09-07 17:49 GMT+02:00 Chakravarthy varaga 
>> :
>>
>>> Hi Team,
>>>
>>>  Can someone help me here? Appreciate any response !
>>>
>>> Best Regards
>>> Varaga
>>>
>>> On Mon, Sep 5, 2016 at 4:51 PM, Chakravarthy varaga <
>>> chakravarth...@gmail.com> wrote:
>>>
>>>> Hi Team,
>>>>
>>>> I'm working on a Flink Streaming application. The data is injected
>>>> through Kafka connectors. The payload volume is roughly 100K/sec. The event
>>>> payload is a string. Let's call this as DataStream1.
>>>> This application also uses another DataStream, call it DataStream2,
>>>> (consumes events off a kafka topic). The elements of this DataStream2
>>>> involves in a certain transformation that finally updates a Hashmap(/Java
>>>> util Collection). Apparently the flink application should share this
>>>> HashMap across the flink cluster so that DataStream1 application could
>>>> check the state of the values in this collection. Is there a way to do this
>>>> in Flink?
>>>>
>>>> I don't see any Shared Collection used within the cluster?
>>>>
>>>> Best Regards
>>>> CVP
>>>>
>>>
>>>
>>
>


Re: Sharing Java Collections within Flink Cluster

2016-09-07 Thread Fabian Hueske
Operator state is always local in Flink. However, with key-value state, you
can have something which behaves kind of similar to a distribute hashmap,
because each operator holds a different shard/partition of the hashtable.

If you have to do only a single key lookup for each element of DS1, you
should think about partitioning both streams (keyBy) and writing the state
into Flink's key-value state [1].

This will have several benefits:
1) State does not need to be replicated
2) Depending on the backend (RocksDB) [2], parts of the state can reside on
disk. You are not bound to the memory of the JVM.
3) Flink takes care of the look-up. No need to have your own hashmap.
4) It will only be possible to rescale jobs with key-value state (this
feature is currently under development).

If using the key-value state is possible, I'd go for that.

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/state.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/state_backends.html

2016-09-07 19:55 GMT+02:00 Chakravarthy varaga :

> certainly, what I thought as well...
> The output of DataStream2 could be in 1000s and there are state updates...
> reading this topic from the other job, job1, is okie.
> However, assuming that we maintain this state into a collection, and
> updating the state (by reading from the topic) in this collection, will
> this be replicated across the cluster within this job1 ?
>
>
>
> On Wed, Sep 7, 2016 at 6:33 PM, Fabian Hueske  wrote:
>
>> Is writing DataStream2 to a Kafka topic and reading it from the other job
>> an option?
>>
>> 2016-09-07 19:07 GMT+02:00 Chakravarthy varaga 
>> :
>>
>>> Hi Fabian,
>>>
>>> Thanks for your response. Apparently these DataStream
>>> (Job1-DataStream1 & Job2-DataStream2) are from different flink applications
>>> running within the same cluster.
>>> DataStream2 (from Job2) applies transformations and updates a
>>> 'cache' on which (Job1) needs to work on.
>>> Our intention is to not use the external key/value store as we are
>>> trying to localize the cache within the cluster.
>>> Is there a way?
>>>
>>> Best Regards
>>> CVP
>>>
>>> On Wed, Sep 7, 2016 at 5:00 PM, Fabian Hueske  wrote:
>>>
>>>> Hi,
>>>>
>>>> Flink does not provide shared state.
>>>> However, you can broadcast a stream to CoFlatMapFunction, such that
>>>> each operator has its own local copy of the state.
>>>>
>>>> If that does not work for you because the state is too large and if it
>>>> is possible to partition the state (and both streams), you can also use
>>>> keyBy instead of broadcast.
>>>>
>>>> Finally, you can use an external system like a KeyValue Store or
>>>> In-Memory store like Apache Ignite to hold your distributed collection.
>>>>
>>>> Best, Fabian
>>>>
>>>> 2016-09-07 17:49 GMT+02:00 Chakravarthy varaga <
>>>> chakravarth...@gmail.com>:
>>>>
>>>>> Hi Team,
>>>>>
>>>>>  Can someone help me here? Appreciate any response !
>>>>>
>>>>> Best Regards
>>>>> Varaga
>>>>>
>>>>> On Mon, Sep 5, 2016 at 4:51 PM, Chakravarthy varaga <
>>>>> chakravarth...@gmail.com> wrote:
>>>>>
>>>>>> Hi Team,
>>>>>>
>>>>>> I'm working on a Flink Streaming application. The data is
>>>>>> injected through Kafka connectors. The payload volume is roughly 
>>>>>> 100K/sec.
>>>>>> The event payload is a string. Let's call this as DataStream1.
>>>>>> This application also uses another DataStream, call it DataStream2,
>>>>>> (consumes events off a kafka topic). The elements of this DataStream2
>>>>>> involves in a certain transformation that finally updates a Hashmap(/Java
>>>>>> util Collection). Apparently the flink application should share this
>>>>>> HashMap across the flink cluster so that DataStream1 application could
>>>>>> check the state of the values in this collection. Is there a way to do 
>>>>>> this
>>>>>> in Flink?
>>>>>>
>>>>>> I don't see any Shared Collection used within the cluster?
>>>>>>
>>>>>> Best Regards
>>>>>> CVP
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>


Re: Sharing Java Collections within Flink Cluster

2016-09-07 Thread Fabian Hueske
That depends.
1) Growing/Shrinking: This should work. New entries can always be inserted.
In order to remove entries from the k-v-state you have to set the value to
null. Note that you need an explicit delete-value record to trigger the
eviction.
2) Multiple lookups: This does only work if all lookups are independent
from each other. You can partition DS1 only on a single key and the other
keys might be located on different shards. A workaround might be to
duplicate S1 events for each key that they need to look up. However, you
might need to collect events from the same S1 event after the join. If that
does not work for you, the only thing that comes to my mind is to broadcast
the state and keep a full local copy in each operator.

Let me add one more thing regarding the upcoming rescaling feature. If this
is interesting for you, rescaling will also work (maybe not in the first
version) for broadcasted state, i.e. state which is the same on all
parallel operator instances.

2016-09-07 21:45 GMT+02:00 Chakravarthy varaga :

> I'm understanding this better with your explanation..
> With this use case,each element in DS1 has to look up against a 'bunch
> of keys' from DS2 and DS2 could shrink/expand in terms of the no., of
> keys will the key-value shard work in this case?
>
> On Wed, Sep 7, 2016 at 7:44 PM, Fabian Hueske  wrote:
>
>> Operator state is always local in Flink. However, with key-value state,
>> you can have something which behaves kind of similar to a distribute
>> hashmap, because each operator holds a different shard/partition of the
>> hashtable.
>>
>> If you have to do only a single key lookup for each element of DS1, you
>> should think about partitioning both streams (keyBy) and writing the state
>> into Flink's key-value state [1].
>>
>> This will have several benefits:
>> 1) State does not need to be replicated
>> 2) Depending on the backend (RocksDB) [2], parts of the state can reside
>> on disk. You are not bound to the memory of the JVM.
>> 3) Flink takes care of the look-up. No need to have your own hashmap.
>> 4) It will only be possible to rescale jobs with key-value state (this
>> feature is currently under development).
>>
>> If using the key-value state is possible, I'd go for that.
>>
>> Best, Fabian
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.1/
>> apis/streaming/state.html
>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.1/
>> apis/streaming/state_backends.html
>>
>> 2016-09-07 19:55 GMT+02:00 Chakravarthy varaga 
>> :
>>
>>> certainly, what I thought as well...
>>> The output of DataStream2 could be in 1000s and there are state
>>> updates...
>>> reading this topic from the other job, job1, is okie.
>>> However, assuming that we maintain this state into a collection, and
>>> updating the state (by reading from the topic) in this collection, will
>>> this be replicated across the cluster within this job1 ?
>>>
>>>
>>>
>>> On Wed, Sep 7, 2016 at 6:33 PM, Fabian Hueske  wrote:
>>>
>>>> Is writing DataStream2 to a Kafka topic and reading it from the other
>>>> job an option?
>>>>
>>>> 2016-09-07 19:07 GMT+02:00 Chakravarthy varaga <
>>>> chakravarth...@gmail.com>:
>>>>
>>>>> Hi Fabian,
>>>>>
>>>>> Thanks for your response. Apparently these DataStream
>>>>> (Job1-DataStream1 & Job2-DataStream2) are from different flink 
>>>>> applications
>>>>> running within the same cluster.
>>>>> DataStream2 (from Job2) applies transformations and updates a
>>>>> 'cache' on which (Job1) needs to work on.
>>>>> Our intention is to not use the external key/value store as we are
>>>>> trying to localize the cache within the cluster.
>>>>> Is there a way?
>>>>>
>>>>> Best Regards
>>>>> CVP
>>>>>
>>>>> On Wed, Sep 7, 2016 at 5:00 PM, Fabian Hueske 
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Flink does not provide shared state.
>>>>>> However, you can broadcast a stream to CoFlatMapFunction, such that
>>>>>> each operator has its own local copy of the state.
>>>>>>
>>>>>> If that does not work for you because the state is too large and if
>>>>>> it is possible to partition the state (and both streams), you can al

Re: Sharing Java Collections within Flink Cluster

2016-09-08 Thread Fabian Hueske
Hi Pushpendra,

1. Queryable state is an upcoming feature and not part of an official
release yet. With queryable state you can query operator state from outside
the application.

2. Have you had a look at the CoFlatMap operator? This operator "connects"
two streams and allows to have state which is accessible from both streams.

Best, Fabian

2016-09-08 8:54 GMT+02:00 pushpendra.jaiswal :

> Hi Fabian
> I am also looking for this solution, could you help me with two things:
>
> 1. How this is different from Queryable state.
>
> 2. How to query this key-value state from DS2 even if its running in  the
> same application.
>
> e.g.
>
> val keyedStream = stream.keyby(_.key)
> val otherStream = somekafka.createStream
>
> The final goal is to have something like:
>
> otherStream.foreach(kafkamessage => keyedStream.lookup(kafkamessage.key))
>
> ~Pushpendra Jaiswal
>
>
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Sharing-Java-
> Collections-within-Flink-Cluster-tp8919p8965.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: assignTimestamp after keyBy

2016-09-08 Thread Fabian Hueske
I would assign timestamps directly at the source.
Timestamps are not striped of by operators.

Reassigning timestamps somewhere in the middle of a job can cause very
unexpected results.

2016-09-08 9:32 GMT+02:00 Dong-iL, Kim :

> Thanks for replying. pushpendra.
> The assignTimestamp method returns not KeyedStream but DataStream.
> so I cannot use windowing.
> is it possible casting to KeyedStream?
> Regards
>
> > On Sep 8, 2016, at 3:12 PM, pushpendra.jaiswal <
> pushpendra.jaiswa...@gmail.com> wrote:
> >
> > Please refer
> > https://ci.apache.org/projects/flink/flink-docs-
> master/dev/event_timestamps_watermarks.html
> > for assigning timestamps.
> >
> > You can do map after keyby to assign timestamps
> >
> > e.g:
> >
> > val withTimestampsAndWatermarks: DataStream[MyEvent] = stream
> >.filter( _.severity == WARNING )
> >.assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks())
> >
> > withTimestampsAndWatermarks
> >.keyBy( _.getGroup )
> >.timeWindow(Time.seconds(10))
> >.reduce( (a, b) => a.add(b) )
> >.addSink(...
> >
> > ~Pushpendra
> >
> >
> >
> > --
> > View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/assignTimestamp-
> after-keyBy-tp8962p8964.html
> > Sent from the Apache Flink User Mailing List archive. mailing list
> archive at Nabble.com.
>
>


Re: scala version of flink mongodb example

2016-09-08 Thread Fabian Hueske
Hi Frank,

input should be of DataSet[(BSONWritable, BSONWritable)], so a
Tuple2[BSONWritable, BSONWritable], right?

Something like this should work:

input.map( pair => pair._1.toString )

Pair is a Tuple2[BSONWritable, BSONWritable], and pair._1 accesses the key
of the pair.

Alternatively you can also add an import
org.apache.flink.api.scala.extensions._

and then you can do

input.mapWith { case (x, y) => x }

Best, Fabian


2016-09-08 18:30 GMT+02:00 Frank Dekervel :

> Hello,
>
> i'm new to flink, and i'm trying to get a mongodb hadoop input format
> working in scala. However, i get lost in the scala generics system ...
> could somebody help me ?
>
> Code is below, neither version works (compile error at the "map" call),
> either because of method not applicable either because of ambiguous
> reference to overloaded method map (flink 1.0.3)
>
> Thanks already
> greetings,
> Frank
>
>
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.api.scala.DataSet;
> import org.apache.flink.api.scala.ExecutionEnvironment;
> import org.apache.flink.api.scala.hadoop.mapred.HadoopInputFormat;
>
> import org.apache.hadoop.mapred.JobConf;
> import org.bson.BSONObject;
>
> import com.mongodb.BasicDBObject;
> import com.mongodb.hadoop.io.BSONWritable;
> import com.mongodb.hadoop.mapred.MongoInputFormat;
>
> val hdIf = new HadoopInputFormat(new MongoInputFormat(),classOf[BSONWritable],
> classOf[BSONWritable], new JobConf())
>
> hdIf.getJobConf().set("mongo.input.uri",
> "mongodb://localhost:27017/handling.event");
>
> val input = env.createInput(hdIf);
>
> def mapfunc(t1: BSONWritable, t2: BSONWritable): String  = {
> return t1.toString()
> }
>
> // does not work
> //input.map mapfunc
>
> // does not work either
> input.map( (t1: BSONWritable, t2: BSONWritable) => t1 )
> // does not work either
> //input.map ( (t1, t2) => t1 )
>
>


Re: scala version of flink mongodb example

2016-09-08 Thread Fabian Hueske
Hi Frank,

I didn't tried to run the code, but this does not show a compiler error in
IntelliJ:

> input.map( mapfunc2 _ )

Decomposing the Tuple2 into two separate arguments does only work with
Scala's pattern matching technique (this is the second approach I posted).
The Java API is not capable of splitting the fields of a Tuple2 argument
into separate arguments.

Best, Fabian


2016-09-08 20:54 GMT+02:00 Frank Dekervel :

> Hello Fabian,
>
> Thanks, your solution works indeed. however, i don't understand why.
> When i replace the lambda by an explicit function
>
> def mapfunc2(pair: Tuple2[BSONWritable, BSONWritable]) : String = {
> return pair._1.toString
> }
> input.map mapfunc2
>
>
> i get the error below, which seemingly indicates that my method call maps
> both to the scala version (first overloaded method) and the java version
> (which works with a MapFunction, second one in the error message)
>
> this was also the error i got when doing the following (which looks the
> most logical to me)
>
> def mapfunc(t1: BSONWritable, t2: BSONWritable): String  = {
> return t1.toString()
> }
> input.map mapfunc
>
> it would seem logical to me to decompose the pair as 2 separate arguments
> (which is what the java version of the example also does at
> https://github.com/okkam-it/flink-mongodb-test)
>
> and this is the error message:
>
> both method map in class DataSet of type [R](fun: 
> ((com.mongodb.hadoop.io.BSONWritable,
> com.mongodb.hadoop.io.BSONWritable)) => R)(implicit evidence$4:
> org.apache.flink.api.common.typeinfo.TypeInformation[R], implicit
> evidence$5: scala.reflect.ClassTag[R])org.apache.flink.api.scala.
> DataSet[R]
> and method map in class DataSet of type [R](mapper:
> org.apache.flink.api.common.functions.MapFunction[(com.mongodb.hadoop.io.BSONWritable,
> com.mongodb.hadoop.io.BSONWritable),R])(implicit evidence$2:
> org.apache.flink.api.common.typeinfo.TypeInformation[R], implicit
> evidence$3: scala.reflect.ClassTag[R])org.apache.flink.api.scala.
> DataSet[R]
> match expected type ?
>
> Thanks!
> Frank
>
>
> On Thu, Sep 8, 2016 at 6:56 PM, Fabian Hueske  wrote:
>
>> Hi Frank,
>>
>> input should be of DataSet[(BSONWritable, BSONWritable)], so a
>> Tuple2[BSONWritable, BSONWritable], right?
>>
>> Something like this should work:
>>
>> input.map( pair => pair._1.toString )
>>
>> Pair is a Tuple2[BSONWritable, BSONWritable], and pair._1 accesses the
>> key of the pair.
>>
>> Alternatively you can also add an import org.apache.flink.api.scala.ext
>> ensions._
>>
>> and then you can do
>>
>> input.mapWith { case (x, y) => x }
>>
>> Best, Fabian
>>
>>
>> 2016-09-08 18:30 GMT+02:00 Frank Dekervel :
>>
>>> Hello,
>>>
>>> i'm new to flink, and i'm trying to get a mongodb hadoop input format
>>> working in scala. However, i get lost in the scala generics system ...
>>> could somebody help me ?
>>>
>>> Code is below, neither version works (compile error at the "map" call),
>>> either because of method not applicable either because of ambiguous
>>> reference to overloaded method map (flink 1.0.3)
>>>
>>> Thanks already
>>> greetings,
>>> Frank
>>>
>>>
>>> import org.apache.flink.api.common.functions.MapFunction;
>>> import org.apache.flink.api.scala.DataSet;
>>> import org.apache.flink.api.scala.ExecutionEnvironment;
>>> import org.apache.flink.api.scala.hadoop.mapred.HadoopInputFormat;
>>>
>>> import org.apache.hadoop.mapred.JobConf;
>>> import org.bson.BSONObject;
>>>
>>> import com.mongodb.BasicDBObject;
>>> import com.mongodb.hadoop.io.BSONWritable;
>>> import com.mongodb.hadoop.mapred.MongoInputFormat;
>>>
>>> val hdIf = new HadoopInputFormat(new 
>>> MongoInputFormat(),classOf[BSONWritable],
>>> classOf[BSONWritable], new JobConf())
>>>
>>> hdIf.getJobConf().set("mongo.input.uri",
>>> "mongodb://localhost:27017/handling.event");
>>>
>>> val input = env.createInput(hdIf);
>>>
>>> def mapfunc(t1: BSONWritable, t2: BSONWritable): String  = {
>>> return t1.toString()
>>> }
>>>
>>> // does not work
>>> //input.map mapfunc
>>>
>>> // does not work either
>>> input.map( (t1: BSONWritable, t2: BSONWritable) => t1 )
>>> // does not work either
>>> //input.map ( (t1, t2) => t1 )
>>>
>>>
>>
>


Re: Sharing Java Collections within Flink Cluster

2016-09-08 Thread Fabian Hueske
Not sure if I got your requirements right, but would this work?

KeyedStream ks1 = ds1.keyBy("*") ;
KeyedStream, String> ks2 = ds2.flatMap(split T into k-v
pairs).keyBy(0);

ks1.connect(ks2).flatMap(X)

X is a CoFlatMapFunction that inserts and removes elements from ks2 into a
key-value state member. Elements from ks1 are matched against that state.

Best, Fabian

2016-09-08 20:24 GMT+02:00 Chakravarthy varaga :

> Hi Fabian,
>
>  First of all thanks for all your prompt responses. With regards to 2)
> Multiple looks ups, I have to clarify what I mean by that...
>
>  DS1 elementKeyStream  = stream1.map(String<>); this maps each
> of the streaming elements into string mapped value...
>  DS2  = stream2.xxx(); // where stream2 is a kafka source stream,
> as you proposed.. xxx() should be my function() which splits the string and
> generates key1:, key2:, key3: keyN:
>
>  Now,
> I wish to map elementKeyStream with look ups within (key1,
> key2...keyN) where key1, key2.. keyN and their respective values should be
> available across the cluster...
>
> Thanks a million !
> CVP
>
>
>
>
> On Wed, Sep 7, 2016 at 9:15 PM, Fabian Hueske  wrote:
>
>> That depends.
>> 1) Growing/Shrinking: This should work. New entries can always be
>> inserted. In order to remove entries from the k-v-state you have to set the
>> value to null. Note that you need an explicit delete-value record to
>> trigger the eviction.
>> 2) Multiple lookups: This does only work if all lookups are independent
>> from each other. You can partition DS1 only on a single key and the other
>> keys might be located on different shards. A workaround might be to
>> duplicate S1 events for each key that they need to look up. However, you
>> might need to collect events from the same S1 event after the join. If that
>> does not work for you, the only thing that comes to my mind is to broadcast
>> the state and keep a full local copy in each operator.
>>
>> Let me add one more thing regarding the upcoming rescaling feature. If
>> this is interesting for you, rescaling will also work (maybe not in the
>> first version) for broadcasted state, i.e. state which is the same on all
>> parallel operator instances.
>>
>> 2016-09-07 21:45 GMT+02:00 Chakravarthy varaga 
>> :
>>
>>> I'm understanding this better with your explanation..
>>> With this use case,each element in DS1 has to look up against a
>>> 'bunch of keys' from DS2 and DS2 could shrink/expand in terms of the no.,
>>> of keys will the key-value shard work in this case?
>>>
>>> On Wed, Sep 7, 2016 at 7:44 PM, Fabian Hueske  wrote:
>>>
>>>> Operator state is always local in Flink. However, with key-value state,
>>>> you can have something which behaves kind of similar to a distribute
>>>> hashmap, because each operator holds a different shard/partition of the
>>>> hashtable.
>>>>
>>>> If you have to do only a single key lookup for each element of DS1, you
>>>> should think about partitioning both streams (keyBy) and writing the state
>>>> into Flink's key-value state [1].
>>>>
>>>> This will have several benefits:
>>>> 1) State does not need to be replicated
>>>> 2) Depending on the backend (RocksDB) [2], parts of the state can
>>>> reside on disk. You are not bound to the memory of the JVM.
>>>> 3) Flink takes care of the look-up. No need to have your own hashmap.
>>>> 4) It will only be possible to rescale jobs with key-value state (this
>>>> feature is currently under development).
>>>>
>>>> If using the key-value state is possible, I'd go for that.
>>>>
>>>> Best, Fabian
>>>>
>>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.1/
>>>> apis/streaming/state.html
>>>> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.1/
>>>> apis/streaming/state_backends.html
>>>>
>>>> 2016-09-07 19:55 GMT+02:00 Chakravarthy varaga <
>>>> chakravarth...@gmail.com>:
>>>>
>>>>> certainly, what I thought as well...
>>>>> The output of DataStream2 could be in 1000s and there are state
>>>>> updates...
>>>>> reading this topic from the other job, job1, is okie.
>>>>> However, assuming that we maintain this state into a collection, and
>>>>> updating the state (by reading from the topic) in this collection, will
>>>>>

Re: NoClassDefFoundError with ElasticsearchSink on Yarn

2016-09-09 Thread Fabian Hueske
+1
I ran into that issue as well. Would be great to have that in the docs!

2016-09-09 11:49 GMT+02:00 Robert Metzger :

> Hi Steffen,
>
> I think it would be good to add it to the documentation.
> Would you like to open a pull request?
>
>
> Regards,
> Robert
>
>
> On Mon, Sep 5, 2016 at 10:26 PM, Steffen Hausmann <
> stef...@hausmann-family.de> wrote:
>
>> Thanks Aris for your explanation!
>>
>> A guava version mismatch was indeed the problem. But in addition to
>> shading the guava dependencies, I encountered another issue caused by
>> conflicting files in META-INF/services:
>>
>> RemoteTransportException[[Failed to deserialize response of type
>>> [org.elasticsearch.action.admin.cluster.node.liveness.LivenessResponse]]];
>>> nested: TransportSerializationException[Failed to deserialize response
>>> of type 
>>> [org.elasticsearch.action.admin.cluster.node.liveness.LivenessResponse]];
>>> nested: ExceptionInInitializerError; nested: IllegalArgumentException[An
>>> SPI class of type org.apache.lucene.codecs.PostingsFormat with name
>>> 'Lucene50' does not exist.  You need to add the corresponding JAR file
>>> supporting this SPI to your classpath.  The current classpath supports the
>>> following names: [es090, completion090, XBloomFilter]];
>>>
>>
>> By adding the following bits to my pom.xml file, the conflicting files
>> are appended instead of overwritten and hence the ElasticSearch sink works
>> as expected:
>>
>> 
>>> META-INF/services/org.apache.lucene.codecs.Codec>> resource>
>>> 
>>> 
>>> META-INF/services/org.apache.lucene.codecs.DocValu
>>> esFormat
>>> 
>>> 
>>>META-INF/services/org.apache.lucene.codecs.Postin
>>> gsFormat
>>> 
>>>
>>
>> Maybe this is something that can be added to the documentation?
>>
>> Thanks,
>> Steffen
>>
>> On 01/09/2016 12:22, aris kol wrote:
>>
>>> Classic problem of every uber-jar containing Hadoop dependencies and
>>> being deployed on Yarn.
>>>
>>> What actually happens is that some Hadoop dependency relies on an
>>> old version of guava (11 in this case), which doesn't have the method.
>>> You may have assembled your fat-jar properly, but because Hadoop deps
>>> get introduced to your classpath before your own, you invoke the method
>>> using the guava 11 version of the class.
>>>
>>> I fixed that by adding this line:
>>>
>>>
>>> ++ Seq(assemblyShadeRules in assembly :=
>>> Seq(ShadeRule.rename("com.google.common.**" -> "shaded.@1").inAll))
>>>
>>> on the artefact that gets deployed on flink.
>>>
>>> What this practically does is to shade the guava dependencies. Instead
>>> of containing references to com.google.common your build will reference
>>> shaded.com.google.common and as a result it will use the proper class in
>>> your fat jar.
>>> Get a bit creative with the name (ie use shadedhausmann instead of
>>> shaded) to avoid colliding with external deps shading stuff (something
>>> you have to do when using guava, joda, jackson etc).
>>>
>>> Let me know if this helped.
>>>
>>> Aris
>>>
>>> 
>>> *From:* Steffen Hausmann 
>>> *Sent:* Thursday, September 1, 2016 8:58 AM
>>> *To:* user@flink.apache.org
>>> *Subject:* NoClassDefFoundError with ElasticsearchSink on Yarn
>>>
>>>
>>> Hi there,
>>>
>>> I’m running a flink program that reads from a Kinesis stream and
>>> eventually writes to an Elasticsearch2 sink. When I’m running the
>>> program locally from the IDE, everything seems to work fine, but when
>>> I’m executing the same program on an EMR cluster with Yarn, a
>>> NoClassDefFoundError occurs:java.lang.NoSuchMethodError:
>>>
>>> com.google.common.util.concurrent.MoreExecutors.directExecut
>>> or()Ljava/util/concurrent/Executor;
>>> at
>>> org.elasticsearch.threadpool.ThreadPool.(ThreadPool.java:190)
>>> at
>>> org.elasticsearch.client.transport.TransportClient$Builder.b
>>> uild(TransportClient.java:133)
>>> at
>>> org.apache.flink.streaming.connectors.elasticsearch2.Elastic
>>> searchSink.open(ElasticsearchSink.java:164)
>>> at
>>> org.apache.flink.api.common.functions.util.FunctionUtils.ope
>>> nFunction(FunctionUtils.java:38)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOp
>>> erator.open(AbstractUdfStreamOperator.java:91)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllO
>>> perators(StreamTask.java:376)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(S
>>> treamTask.java:256)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>> I’ve installed flink on an EMR cluster from the binary distribution
>>> flink-1.1.1-bin-hadoop27-scala_2.10.tgzand the jar file that is executed
>>> on the cluster is build with mvn clean package(I’ve attached the pom.xml
>>> for reference).
>>>
>>> There is a thread on this list that seems to be related, but I’m afraid
>>> I couldn’t draw any

Re: Why tuples are not ignored after watermark?

2016-09-15 Thread Fabian Hueske
No, this is not possible unless you use an external service such as a
database.
The assigners might run on different machines and Flink does not provide
utilities for r/w shared state.

Best, Fabian

2016-09-15 20:17 GMT+02:00 Saiph Kappa :

> And is it possible to share state across parallel instances with
> AssignerWithPunctuatedWatermarks?
>
> Thanks!
>
> On Wed, Sep 14, 2016 at 9:52 AM, Aljoscha Krettek 
> wrote:
>
>> Hi,
>> the problem might be that your timestamp/watermark assigner is run in
>> parallel and that only one parallel instance of those operators emits the
>> watermark because only one of those parallel instances sees the element
>> with _3 == 9000. For the watermark to advance at an operator it needs to
>> advance in all upstream operations.
>>
>> Cheers,
>> Aljoscha
>>
>> On Fri, 9 Sep 2016 at 18:29 Saiph Kappa  wrote:
>>
>>> Hi,
>>>
>>> I have a streaming (event time) application where I am receiving events
>>> with the same assigned timestamp. I receive 1 events in total on a
>>> window of 5 minutes, but I emit water mark when 9000 elements have been
>>> received. This watermark is 6 minutes after the assigned timestamps. My
>>> question is: why the function that is associated with the window reads
>>> 1 elements and not 9000? All elements that have a timestamp lower than
>>> the watermark should be ignored (1000), but it's not happening.
>>>
>>> Here is part of the code:
>>> «
>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>> val rawStream = env.socketTextStream("localhost", 4321)
>>>
>>> val punctuatedAssigner = new AssignerWithPunctuatedWatermarks[(String,
>>> Int, Long)] {
>>>   val timestamp = System.currentTimeMillis();
>>>
>>>   override def extractTimestamp(element: (String, Int, Long),
>>> previousElementTimestamp: Long): Long =
>>> timestamp
>>>
>>>   override def checkAndGetNextWatermark(lastElement: (String, Int,
>>> Long), extractedTimestamp: Long): Watermark = {
>>> if(lastElement._3 == 9000) {
>>>   val ts = extractedTimestamp + TimeUnit.MINUTES.toMillis(6)
>>>   new watermark.Watermark(ts)
>>> } else null
>>>   }
>>> }
>>>
>>> val stream = rawStream.map(line => {
>>>   val Array(p1, p2, p3) = line.split(" ")
>>>   (p1, p2.toInt, p3.toLong)
>>> })
>>>   .assignTimestampsAndWatermarks(punctuatedAssigner)
>>>
>>> stream.keyBy(1).timeWindow(Time.of(5, TimeUnit.MINUTES)).apply(function)
>>> »
>>>
>>> Thanks!
>>>
>>
>


Re: Streaming - memory management

2016-09-15 Thread Fabian Hueske
Thanks for the pointer and sorry for the late answer.
I guess that depends on the semantics of "checkpointing". In Flink's
terminology this means creating a copy of the state (and writing the copy
to the external FS). It does not mean that the state is migrated or moved
to the external FS.

Best, Fabian

2016-09-01 20:53 GMT+02:00 vinay patil :

> Hi Fabian,
>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/state_backends.
> html#the-rocksdbstatebackend
>
> I am referring to this, this does not clearly state if the state will be
> maintained in local disk even after checkpointing.
>
> Or I am not getting it correclty :)
>
> Regards,
> Vinay Patil
>
> On Thu, Sep 1, 2016 at 1:38 PM, Fabian Hueske-2 [via Apache Flink User
> Mailing List archive.] <[hidden email]
> <http:///user/SendEmail.jtp?type=node&node=8849&i=0>> wrote:
>
>> You do not have to convert your DTO into a JSON object to use it as a
>> key-value state in a Flink function.
>> You can pass it as it is via the state interfaces.
>>
>> Can you point me to the documentation that you find confusing? The state
>> documentation [1] says:
>>
>> >> You can make *every* transformation (map, filter, etc) stateful by
>> using Flink’s state interface or checkpointing instance fields of your
>> function.
>> >> You can register any instance field as *managed* state by
>> implementing an interface.
>> >> In this case, and also in the case of using Flink’s native state
>> interface, Flink will automatically take consistent snapshots of your state
>> periodically, and restore its value in the case of a failure.
>>
>> Is that unclear/confusing or are you referring to different paragraph?
>>
>> Thanks, Fabian
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.1/
>> apis/streaming/state.html
>>
>> 2016-09-01 20:22 GMT+02:00 vinay patil <[hidden email]
>> <http:///user/SendEmail.jtp?type=node&node=8847&i=0>>:
>>
>>> I don't to join the third stream.
>>>
>>> And Yes, This is what I was thinking of.also :
>>> s1.union(s2).keyBy().window().apply(// outerjoin).keyBy.flatMap(//
>>> backup join)
>>>
>>>
>>> I am already done integrating with Cassandra but I feel RocksDB will be
>>> a better option, I will have to take care of the clearing part as you have
>>> suggested, will check that in documentation.
>>>
>>> I have the DTO with almost 50 fields , converting it to JSON and storing
>>> it as a state should not be a problem , or there is no harm in storing the
>>> DTO ?
>>>
>>> I think the documentation should specify the point that the state will
>>> be maintained for user-defined operators to avoid confusion.
>>>
>>> Regards,
>>> Vinay Patil
>>>
>>> On Thu, Sep 1, 2016 at 1:12 PM, Fabian Hueske-2 [via Apache Flink User
>>> Mailing List archive.] <[hidden email]
>>> <http:///user/SendEmail.jtp?type=node&node=8845&i=0>> wrote:
>>>
>>>> I thought you would like to join the non-matched elements with another
>>>> (third) stream.
>>>>
>>>> --> s1.union(s2).keyBy().window().apply(// outerjoin).keyBy.connect(
>>>> s3.keyBy).coFlatMap(// backup join)
>>>>
>>>> If you want to match the non-matched stream with itself a
>>>> FlatMapFunction is the right choice.
>>>>
>>>> --> s1.union(s2).keyBy().window().apply(// outerjoin).keyBy.flatMap(//
>>>> backup join)
>>>>
>>>> The backup join puts all non-match elements in the state and waits for
>>>> another non-matched element with the same key to do the join.
>>>>
>>>> Best, Fabian
>>>>
>>>>
>>>>
>>>> 2016-09-01 19:55 GMT+02:00 vinay patil <[hidden email]
>>>> <http:///user/SendEmail.jtp?type=node&node=8843&i=0>>:
>>>>
>>>>> Yes, that's what I am looking for.
>>>>>
>>>>> But why to use CoFlatMapFunction , I have already got the
>>>>> matchingAndNonMatching Stream , by doing the union of two streams and
>>>>> having the logic in apply method for performing outer-join.
>>>>>
>>>>> I am thinking of applying the same key on matchingAndNonMatching and
>>>>> flatmap to take care of rest logic.
>>>>>
>>>>> Or are you suggestion to use Co-FlatMapFunction a

Re: SQL / Tuple question

2016-09-19 Thread Fabian Hueske
Hi Radu,

you can pass the TypeInfo directly without accessing the TypeClass.

Have you tried this?

TypeInformation> tpinf = new TypeHint>(){}.getTypeInfo();

.toDataStream( , tpinf )


Best, Fabian

2016-09-19 17:53 GMT+02:00 Radu Tudoran :

> Hi,
>
>
>
> I am trying to create an sql statement that is suppose to return a string
> and and integer
>
>
>
> Mytable.sql(“select mystringfield,myintfield …. )
>
>
>
> I am trying to give the typefinormation as a class to the
>
>
>
> TypeInformation> tpinf = *new*
> TypeHint>(){}.getTypeInfo();
>
>
>
> .toDataStream( , tpinf.getTypeClass() )
>
>
>
> However I get the following error shown below.
>
> Can someone give me an example of some working tuples for this case.
>
>
>
>
>
>
>
> Exception in thread "main"
> *org.apache.flink.api.common.functions.InvalidTypesException*: Tuple
> needs to be parameterized by using generics.
>
>  at
>
>
>


Re: Job Stuck in cancel state

2016-09-19 Thread Fabian Hueske
Hi Janardhan,

to sure what's going wrong here. Maybe Till (in CC) has an idea?

Best, Fabian

2016-09-19 19:45 GMT+02:00 Janardhan Reddy :

> HI,
>
> I cancelled a restarting job from flink UI and the job is stuck in
> cancelling state. (Fixed delay restart strategy was configured for the
> job). The following error message is present in taskmanager logs.
>
> akka.remote.OversizedPayloadException: Discarding oversized payload sent
> to Actor[akka.tcp://flink@10.200.7.245:42589/user/jobmanager#-146176374]:
> max allowed size 10485760 bytes, actual size of encoded class
> org.apache.flink.runtime.messages.JobManagerMessages$LeaderSessionMessage
> was 20670224 bytes.
>
>
> Does the leader session message here denote the job cancel message which
> is sent to the job manager decorated with leader session id ?
>
> Thread dump of taskmanager:
>
>
> Attaching to process ID 28948, please wait...
>
> Debugger attached successfully.
>
> Server compiler detected.
>
> JVM version is 25.101-b13
>
> Deadlock Detection:
>
>
> java.lang.RuntimeException: Unable to deduce type of thread from address
> 0x7f96e56c9800 (expected type JavaThread, CompilerThread,
> ServiceThread, JvmtiAgentThread, or SurrogateLockerThread)
>
> at sun.jvm.hotspot.runtime.Threads.createJavaThreadWrapper(
> Threads.java:169)
>
> at sun.jvm.hotspot.runtime.Threads.first(Threads.java:153)
>
> at sun.jvm.hotspot.runtime.DeadlockDetector.createThreadTable(
> DeadlockDetector.java:149)
>
> at sun.jvm.hotspot.runtime.DeadlockDetector.print(
> DeadlockDetector.java:56)
>
> at sun.jvm.hotspot.runtime.DeadlockDetector.print(
> DeadlockDetector.java:39)
>
> at sun.jvm.hotspot.tools.StackTrace.run(StackTrace.java:62)
>
> at sun.jvm.hotspot.tools.StackTrace.run(StackTrace.java:45)
>
> at sun.jvm.hotspot.tools.JStack.run(JStack.java:66)
>
> at sun.jvm.hotspot.tools.Tool.startInternal(Tool.java:260)
>
> at sun.jvm.hotspot.tools.Tool.start(Tool.java:223)
>
> at sun.jvm.hotspot.tools.Tool.execute(Tool.java:118)
>
> at sun.jvm.hotspot.tools.JStack.main(JStack.java:92)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
>
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:498)
>
> at sun.tools.jstack.JStack.runJStackTool(JStack.java:140)
>
> at sun.tools.jstack.JStack.main(JStack.java:106)
>
> Caused by: sun.jvm.hotspot.types.WrongTypeException: No suitable match
> for type of address 0x7f96e56c9800
>
> at sun.jvm.hotspot.runtime.InstanceConstructor.newWrongTypeException(
> InstanceConstructor.java:62)
>
> at sun.jvm.hotspot.runtime.VirtualConstructor.instantiateWrapperFor(
> VirtualConstructor.java:80)
>
> at sun.jvm.hotspot.runtime.Threads.createJavaThreadWrapper(
> Threads.java:165)
>
> ... 17 more
>
> Can't print deadlocks:Unable to deduce type of thread from address
> 0x7f96e56c9800 (expected type JavaThread, CompilerThread,
> ServiceThread, JvmtiAgentThread, or SurrogateLockerThread)
>
> Exception in thread "main" java.lang.reflect.InvocationTargetException
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
>
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:498)
>
> at sun.tools.jstack.JStack.runJStackTool(JStack.java:140)
>
> at sun.tools.jstack.JStack.main(JStack.java:106)
>
> Caused by: java.lang.RuntimeException: Unable to deduce type of thread
> from address 0x7f96e56c9800 (expected type JavaThread, CompilerThread,
> ServiceThread, JvmtiAgentThread, or SurrogateLockerThread)
>
> at sun.jvm.hotspot.runtime.Threads.createJavaThreadWrapper(
> Threads.java:169)
>
> at sun.jvm.hotspot.runtime.Threads.first(Threads.java:153)
>
> at sun.jvm.hotspot.tools.StackTrace.run(StackTrace.java:75)
>
> at sun.jvm.hotspot.tools.StackTrace.run(StackTrace.java:45)
>
> at sun.jvm.hotspot.tools.JStack.run(JStack.java:66)
>
> at sun.jvm.hotspot.tools.Tool.startInternal(Tool.java:260)
>
> at sun.jvm.hotspot.tools.Tool.start(Tool.java:223)
>
> at sun.jvm.hotspot.tools.Tool.execute(Tool.java:118)
>
> at sun.jvm.hotspot.tools.JStack.main(JStack.java:92)
>
> ... 6 more
>
> Caused by: sun.jvm.hotspot.types.WrongTypeException: No suitable match
> for type of address 0x7f96e56c9800
>
> at sun.jvm.hotspot.runtime.InstanceConstructor.newWrongTypeException(
> InstanceConstructor.java:62)
>
> at sun.jvm.hotspot.runtime.VirtualConstructor.instantiateWrapperFor(
> VirtualConstructor.java:80)
>
> at sun.jvm.hotspot.runtime.Threads.createJavaThreadWrapper(
> Threads.java:165)
>
> ... 14 more
>


Re: Problem with CEPPatternOperator when taskmanager is killed

2016-09-19 Thread Fabian Hueske
Thanks for looking into this Frank!

I opened FLINK-4636 [1] to track the issue.

Would you or Jaxbihani like to contribute a patch for this bug?

[1] https://issues.apache.org/jira/browse/FLINK-4636

2016-09-17 21:15 GMT+02:00 Frank Dekervel :

> Hello,
>
> looks like a bug ... when a PriorityQueue is made with initialCapacity
> zero (see PriorityQueue.java) an illegal argument exception is thrown
>
> http://stackoverflow.com/questions/3609342/why-
> priorityqueue-in-java-cannot-have-initialcapacity-0
>
> The fix would be trivial:
>
> https://github.com/apache/flink/blob/master/flink-
> libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/
> AbstractCEPPatternOperator.java#L129:
>
> when numberPriorityQueueEntries equals zero, create the PriorityQueue
> with capacity 1 instead of 0.
>
> greetings,
> Frank
>
>
>
> On Sat, Sep 17, 2016 at 2:56 PM, jaxbihani  wrote:
>
>> Updated attachment containing exceptions  stacktrace
>> > nabble.com/file/n9048/stacktrace.stacktrace>
>>
>>
>>
>> --
>> View this message in context: http://apache-flink-user-maili
>> ng-list-archive.2336050.n4.nabble.com/Problem-with-CEPPat
>> ternOperator-when-taskmanager-is-killed-tp9024p9048.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>
>


Re: flink run throws NPE, JobSubmissionResult is null when interactive and not isDetached()

2016-09-19 Thread Fabian Hueske
Hi Luis,

this looks like a bug.
Can you open a JIRA [1] issue and provide a more detailed description of
what you do (Environment, DataStream / DataSet, how do you submit the
program, maybe add a small program that reproduce the problem on your
setup)?

Thanks, Fabian

2016-09-19 17:30 GMT+02:00 Luis Mariano Guerra :

> context: I have two other similar jobs in the same project that run
> without problem.
>
> On Mon, Sep 19, 2016 at 4:28 PM, Luis Mariano Guerra <
> mari...@event-fabric.com> wrote:
>
>> hi
>>
>> submitting a job I get a NPE here:
>> https://github.com/apache/flink/blob/master/flink-clients/
>> src/main/java/org/apache/flink/client/CliFrontend.java#L781
>>
>> building from source and adding some prints I got that 
>> this.lastJobExecutionResult
>> here seems to be null: https://github.com/apache/flin
>> k/blob/master/flink-clients/src/main/java/org/apache/
>> flink/client/program/ClusterClient.java#L329
>>
>> any hint of what may I be doing wrong for this to fail like this?
>>
>
>


Re: Serialization problem for Guava's TreeMultimap

2016-09-20 Thread Fabian Hueske
Hi Yukun,

I debugged this issue and found that this is a bug in the serialization of
the StateDescriptor.
I have created FLINK-4640 [1] to resolve the issue.

Thanks for reporting the issue.

Best, Fabian

[1] https://issues.apache.org/jira/browse/FLINK-4640

2016-09-20 10:35 GMT+02:00 Yukun Guo :

> Some detail: if running the FoldFunction on a KeyedStream, everything
> works fine. So it must relate to the way WindowedStream handles type
> extraction.
>
> In case any Flink experts would like to reproduce it, I have created a
> repo on Github: github.com/gyk/flink-multimap
>
> On 20 September 2016 at 10:33, Yukun Guo  wrote:
>
>> Hi,
>>
>> The same error occurs after changing the code, unfortunately.
>>
>> BTW, registerTypeWithKryoSerializer requires the 2nd argument to be a `T
>> serializer` where T extends Serializer & Serializable, so I pass a
>> custom GenericJavaSerializer, but I guess this doesn't matter much.
>>
>>
>> On 19 September 2016 at 18:02, Stephan Ewen  wrote:
>>
>>> Hi!
>>>
>>> Can you use 
>>> "env.getConfig().registerTypeWithKryoSerializer(TreeMultimap.class, 
>>> JavaSerializer.class)"
>>> ?
>>>
>>> Best,
>>> Stephan
>>>
>>>
>>> On Sun, Sep 18, 2016 at 12:53 PM, Yukun Guo  wrote:
>>>
 Here is the code snippet:

 windowedStream.fold(TreeMultimap.create(), new 
 FoldFunction, TreeMultimap>() {
 @Override
 public TreeMultimap fold(TreeMultimap 
 topKSoFar,
Tuple2 itemCount) 
 throws Exception {
 String item = itemCount.f0;
 Long count = itemCount.f1;
 topKSoFar.put(count, item);
 if (topKSoFar.keySet().size() > topK) {
 topKSoFar.removeAll(topKSoFar.keySet().first());
 }
 return topKSoFar;
 }
 });


 The problem is when fold function getting called, the initial value has
 lost therefore it encounters a NullPointerException. This is due to failed
 type extraction and serialization, as shown in the log message:
 "INFO  TypeExtractor:1685 - No fields detected for class
 com.google.common.collect.TreeMultimap. Cannot be used as a PojoType.
 Will be handled as GenericType."

 I have tried the following two ways to fix it but neither of them
 worked:

 1. Writing a class TreeMultimapSerializer which extends Kryo's
 Serializer, and calling 
 `env.addDefaultKryoSerializer(TreeMultimap.class,
 new TreeMultimapSerializer()`. The write/read methods are almost
 line-by-line translations from TreeMultimap's own implementation.

 2. TreeMultimap has implemented Serializable interface so Kryo can fall
 back to use the standard Java serialization. Since Kryo's JavaSerializer
 itself is not serializable, I wrote an adapter to make it fit the
 "addDefaultKryoSerializer" API.

 Could you please give me some working examples for custom Kryo
 serialization in Flink?


 Best regards,
 Yukun


>>>
>>
>


Re: Flink JDBCOutputFormat logs wrong WARN message

2016-09-20 Thread Fabian Hueske
Yes, the condition needs to be fixed.

@Swapnil, would you like to create a JIRA issue and open a pull request to
fix it?

Thanks, Fabian

2016-09-20 11:22 GMT+02:00 Chesnay Schepler :

> I would agree that the condition should be changed.
>
>
> On 20.09.2016 10:52, Swapnil Chougule wrote:
>
>> I checked following code in Flink JDBCOutputFormat while I was using in
>> my project work. I found following snippet:
>>
>> @Override
>> public void writeRecord(Row row) throws IOException {
>>
>> if (typesArray != null && typesArray.length > 0 &&
>> typesArray.length == row.productArity()) {
>> LOG.warn("Column SQL types array doesn't match arity of
>> passed Row! Check the passed array...");
>> }
>> ...
>> }
>>
>> I am finding this "if" condition wrong. It should be
>>
>> if (typesArray != null && typesArray.length > 0 && typesArray.length !=
>> row.productArity())
>>
>> As a result, it is logging wrong warning in my logs which is incorrect.
>> (Even if typearray matches arity of passed row)
>> May I have inputs on same ?
>>
>> Thanks,
>> Swapnil
>>
>
>


Re: Simple batch job hangs if run twice

2016-09-23 Thread Fabian Hueske
Hi Yassine, can you share a stacktrace of the job when it got stuck?

Thanks, Fabian

2016-09-22 14:03 GMT+02:00 Yassine MARZOUGUI :

> The input splits are correctly assgined. I noticed that whenever the job
> is stuck, that is because the task *Combine (GroupReduce at
> first(DataSet.java:573)) *keeps RUNNING and never switches to FINISHED.
> I tried to debug the program at the *first(100), *but I couldn't do much.
> I attahced the full DEBUG output.
>
> 2016-09-22 12:10 GMT+02:00 Robert Metzger :
>
>> Can you try running with DEBUG logging level?
>> Then you should see if input splits are assigned.
>> Also, you could try to use a debugger to see what's going on.
>>
>> On Mon, Sep 19, 2016 at 2:04 PM, Yassine MARZOUGUI <
>> y.marzou...@mindlytix.com> wrote:
>>
>>> Hi Chensey,
>>>
>>> I am running Flink 1.1.2, and using NetBeans 8.1.
>>> I made a screencast reproducing the problem here:
>>> http://recordit.co/P53OnFokN4 .
>>>
>>> Best,
>>> Yassine
>>>
>>>
>>> 2016-09-19 10:04 GMT+02:00 Chesnay Schepler :
>>>
 No, I can't recall that i had this happen to me.

 I would enable logging and try again, as well as checking whether the
 second job is actually running through the WebInterface.

 If you tell me your NetBeans version i can try to reproduce it.

 Also, which version of Flink are you using?


 On 19.09.2016 07:45, Aljoscha Krettek wrote:

 Hmm, this sound like it could be IDE/Windows specific, unfortunately I
 don't have access to a windows machine. I'll loop in Chesnay how is using
 windows.

 Chesnay, do you maybe have an idea what could be the problem? Have you
 ever encountered this?

 On Sat, 17 Sep 2016 at 15:30 Yassine MARZOUGUI <
 y.marzou...@mindlytix.com> wrote:

> Hi Aljoscha,
>
> Thanks for your response. By the first time I mean I hit run from the
> IDE (I am using Netbeans on Windows) the first time after building the
> program. If then I stop it and run it again (without rebuidling) It is
> stuck in the state RUNNING. Sometimes I have to rebuild it, or close the
> IDE to be able to get an output. The behaviour is random, maybe it's
> related to the IDE or the OS and not necessarily Flink itself.
>
> On Sep 17, 2016 15:16, "Aljoscha Krettek"  wrote:
>
>> Hi,
>> when is the "first time". It seems you have tried this repeatedly so
>> what differentiates a "first time" from the other times? Are you closing
>> your IDE in-between or do you mean running the job a second time within 
>> the
>> same program?
>>
>> Cheers,
>> Aljoscha
>>
>> On Fri, 9 Sep 2016 at 16:40 Yassine MARZOUGUI <
>> y.marzou...@mindlytix.com> wrote:
>>
>>> Hi all,
>>>
>>> When I run the following batch job inside the IDE for the first
>>> time, it outputs results and switches to FINISHED, but when I run it 
>>> again
>>> it is stuck in the state RUNNING. The csv file size is 160 MB. What 
>>> could
>>> be the reason for this behaviour?
>>>
>>> public class BatchJob {
>>>
>>> public static void main(String[] args) throws Exception {
>>> final ExecutionEnvironment env =
>>> ExecutionEnvironment.getExecutionEnvironment();
>>>
>>> env.readCsvFile("dump.csv")
>>> .ignoreFirstLine()
>>> .fieldDelimiter(";")
>>> .includeFields("111000")
>>> .types(String.class, String.class, String.class)
>>> .first(100)
>>> .print();
>>>
>>> }
>>> }
>>>
>>> Best,
>>> Yassine
>>>
>>

>>>
>>
>


Re: Simple batch job hangs if run twice

2016-09-23 Thread Fabian Hueske
Yes, log files and stacktraces are different things.
A stacktrace shows the call hierarchy of all threads in a JVM at the time
when it is taken. So you can see the method that is currently executed (and
from where it was called) when the stacktrace is taken. In case of a
deadlock, you see where the program is waiting.

The stack you sent is only a part of the complete stacktrace. Most IDEs
have a feature to take a stacktrace while they are executing a program.

2016-09-23 11:43 GMT+02:00 Yassine MARZOUGUI :

> Hi Fabian,
>
> Not sure if this answers your question, here is the stack I got when
> debugging the combine and datasource operators when the job got stuck:
>
> "DataSource (at main(BatchTest.java:28) 
> (org.apache.flink.api.java.io.TupleCsvInputFormat))
> (1/8)"
> at java.lang.Object.wait(Object.java)
> at org.apache.flink.runtime.io.network.buffer.
> LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
> at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.
> requestBufferBlocking(LocalBufferPool.java:133)
> at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(
> RecordWriter.java:93)
> at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(
> OutputCollector.java:65)
> at org.apache.flink.runtime.operators.util.metrics.
> CountingCollector.collect(CountingCollector.java:35)
> at org.apache.flink.runtime.operators.DataSourceTask.
> invoke(DataSourceTask.java:163)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
>
> "Combine (GroupReduce at first(DataSet.java:573)) (1/8)"
> at java.lang.Object.wait(Object.java)
> at org.apache.flink.runtime.io.network.buffer.
> LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
> at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.
> requestBufferBlocking(LocalBufferPool.java:133)
> at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(
> RecordWriter.java:93)
> at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(
> OutputCollector.java:65)
> at org.apache.flink.api.java.functions.FirstReducer.reduce(
> FirstReducer.java:41)
> at org.apache.flink.api.java.functions.FirstReducer.
> combine(FirstReducer.java:52)
> at org.apache.flink.runtime.operators.AllGroupReduceDriver.run(
> AllGroupReduceDriver.java:152)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
>
> Best,
> Yassine
>
>
> 2016-09-23 11:28 GMT+02:00 Yassine MARZOUGUI :
>
>> Hi Fabian,
>>
>> Is it different from the output I already sent? (see attached file). If
>> yes, how can I obtain the stacktrace of the job programmatically? Thanks.
>>
>> Best,
>> Yassine
>>
>> 2016-09-23 10:55 GMT+02:00 Fabian Hueske :
>>
>>> Hi Yassine, can you share a stacktrace of the job when it got stuck?
>>>
>>> Thanks, Fabian
>>>
>>> 2016-09-22 14:03 GMT+02:00 Yassine MARZOUGUI 
>>> :
>>>
>>>> The input splits are correctly assgined. I noticed that whenever the
>>>> job is stuck, that is because the task *Combine (GroupReduce at
>>>> first(DataSet.java:573)) *keeps RUNNING and never switches to FINISHED.
>>>> I tried to debug the program at the *first(100), *but I couldn't do
>>>> much. I attahced the full DEBUG output.
>>>>
>>>> 2016-09-22 12:10 GMT+02:00 Robert Metzger :
>>>>
>>>>> Can you try running with DEBUG logging level?
>>>>> Then you should see if input splits are assigned.
>>>>> Also, you could try to use a debugger to see what's going on.
>>>>>
>>>>> On Mon, Sep 19, 2016 at 2:04 PM, Yassine MARZOUGUI <
>>>>> y.marzou...@mindlytix.com> wrote:
>>>>>
>>>>>> Hi Chensey,
>>>>>>
>>>>>> I am running Flink 1.1.2, and using NetBeans 8.1.
>>>>>> I made a screencast reproducing the problem here:
>>>>>> http://recordit.co/P53OnFokN4 <http://recordit.co/VRBpBlb51A>.
>>>>>>
>>>>>> Best,
>>>>>> Yassine
>>>>>>
>>>>>>
>>>>>> 2016-09-19 10:04 GMT+02:00 Chesnay Schepler :
>>>>>>
>>>>>>> No, I can't recall that i had this happen to me.
>>>>>>>
>>>>>>> I would enable logging and try again, as 

Re: Flink Checkpoint runs slow for low load stream

2016-09-23 Thread Fabian Hueske
Hi CVP,

I'm not so much familiar with the internals of the checkpointing system,
but maybe Stephan (in CC) has an idea what's going on here.

Best, Fabian

2016-09-23 11:33 GMT+02:00 Chakravarthy varaga :

> Hi Aljoscha & Fabian,
>
> I have a stream application that has 2 stream source as below.
>
>  KeyedStream *ks1* = ds1.keyBy("*") ;
>  KeyedStream, String> *ks2* = ds2.flatMap(split T
> into k-v pairs).keyBy(0);
>
>  ks1.connect(ks2).flatMap(X);
>  //X is a CoFlatMapFunction that inserts and removes elements from ks2
> into a key-value state member. Elements from ks1 are matched against that
> state. the CoFlatMapFunction operator maintains ValueState Long>>;
>
>  //ks1 is streaming about 100K events/sec from kafka topic
>  //ks2 is streaming about 1 event every 10 minutes... Precisely when
> the 1st event is consumed from this stream, checkpoint takes 2 minutes
> straight away.
>
> The version of flink is 1.1.2.
>
> I tried to use checkpoint every 10 Secs using a FsStateBackend... What I
> notice is that the checkpoint duration is almost 2 minutes for many cases,
> while for the other cases it varies from 100 ms to 1.5 minutes frequently.
> I'm attaching the snapshot of the dashboard for your reference.
>
>  Is this an issue with flink checkpointing?
>
>  Best Regards
> CVP
>


Re: Complex batch workflow needs (too) much time to create executionPlan

2016-09-26 Thread Fabian Hueske
Hi Markus,

thanks for the stacktraces!
The client is indeed stuck in the optimizer. I have to look a bit more into
this.
Did you try to set JoinHints in your plan? That should reduce the plan
space that is enumerated and therefore reduce the optimization time (maybe
enough to run your application as a single program).

Nonetheless, I think we should look into the optimizer and check how we can
improve the plan enumeration.
I create the JIRA issue FLINK-4688 [1] to track this issue.

Thanks for reporting this,
Fabian

[1] https://issues.apache.org/jira/browse/FLINK-4688



2016-09-26 17:58 GMT+02:00 Markus Nentwig :

> Hi Fabian,
>
> at first, sorry for the late answer. The given execution plan was created
> after 20 minutes, only one vertex centric iteration is missing. I can
> optimize the program because some operators are only needed to create
> intermediate debug results, still, it's not enough to run as one Flink job.
> My "solution" is currently that I split the program in several parts and
> execute them on their own writing intermediate results to disk, which is
> working.
>
> As for the stacktrace, I created some of them for the "big" workflow which
> does not finish the optimization phase at all, here are three of them:
> shortly after start: http://pastebin.com/i7SMGbxa
> after ~32 minutes: http://pastebin.com/yJrYETxi
> after ~76minutes: http://pastebin.com/fCsv8bie
>
> I am no expert in analyzing these stacktraces, but perhaps it's helping in
> some way for you!? ;)
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Complex-batch-
> workflow-needs-too-much-time-to-create-executionPlan-tp8596p9177.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: TaskManager & task slots

2016-09-26 Thread Fabian Hueske
Hi Buvana,

A TaskManager runs as a single JVM process. A TaskManager provides a
certain number of processing slots. Slots do not guard CPU time, IO, or JVM
memory. At the moment they only isolate managed memory which is only used
for batch processing. For streaming applications their only purpose is to
limit the number of parallel threads that can be executed by a TaskManager.

In each processing slot, a full slice of a program can be executed, i.e.,
one parallel subtask of each operator of a program. Given a simple program
(source -> map -> sink), a slot can process one subtask of the source, the
mapper, and the sink (it is possible to split a program to be executed in
more slots). Each operator can be executed as a separate thread. However,
in many situations, operators are chained within the same thread to improve
performance (again it is possible to disallow chaining).

Let me know if you have more questions,
Fabian

2016-09-26 20:31 GMT+02:00 Ramanan, Buvana (Nokia - US) <
buvana.rama...@nokia-bell-labs.com>:

> Hello,
>
>
>
> I would like to understand the following better:
>
>
>
> https://ci.apache.org/projects/flink/flink-docs-
> release-1.1/setup/config.html#configuring-taskmanager-processing-slots
>
>
>
> Fundamental question – what is the notion of Task Slot? Does it correspond
> to one JVM? Or the Task Manager itself corresponds to one JVM?
>
> Example-1 shows a parallelism of 1 and has 3 operators – flatMap, Reduce &
> Sink. Here comes the question – are these 3 operators running a separate
> threads within a JVM?
>
>
>
> Sorry for the naïve questions. I studied the following links and could not
> get a clear answer:
>
> https://ci.apache.org/projects/flink/flink-docs-
> release-1.1/internals/general_arch.html
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.1/internals/job_
> scheduling.html
>
>
>
> Are there more documents under Flink’s wiki site / elsewhere? Please point
> me to more info on the architecture.
>
>
>
> thank you,
>
> regards,
>
> Buvana
>
>
>


Re: Flink-HBase connectivity through Kerberos keytab | Simple get/put sample implementation

2016-09-29 Thread Fabian Hueske
Hi Anchit,

Flink does not yet have a streaming sink connector for HBase. Some members
of the community are working on this though [1].
I think we resolved a similar issue for the Kafka connector recently [2].
Maybe the related commits contain some relevant code for your problem.

Best, Fabian

[1] https://github.com/apache/flink/pull/2332
[2] https://issues.apache.org/jira/browse/FLINK-3239

2016-09-29 9:12 GMT+02:00 Anchit Jatana :

> Hi All,
>
> I'm trying to link my flink application with HBase for simple read/write
> operations. I need to implement Flink to HBase the connectivity through
> Kerberos using the keytab.
>
> Can somebody share(or link me to some resource) a sample
> code/implementation on how to achieve Flink to HBase connectivity through
> Kerberos using keytab for simple read/write (get/put) operation.
>
> Thank you!
>
> Regards,
> Anchit
>


Re: Iterations vs. combo source/sink

2016-09-29 Thread Fabian Hueske
Hi Ken,

you can certainly have partitioned sources and sinks. You can control the
parallelism by calling .setParallelism() method.
If you need a partitioned sink, you can call .keyBy() to hash partition.

I did not completely understand the requirements of your program. Can you
maybe provide pseudo code for how the program should look like.

Some general comments:
- Flink's fault tolerance mechanism does not work with iterative data flows
yet. This is work in progress see: FLINK-3257 [1]
- Flink's fault tolerance mechanism does only work if you expose all!
internal operator state. So you would need to put your Java DB in Flink
state to have a recoverable job.
- Is the DB essential in your application? Could you use Flink's
key-partitioned state interface instead? That would help to make your job
fault-tolerant.

Best,
Fabian

[1] https://issues.apache.org/jira/browse/FLINK-3257
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/state.html#using-the-keyvalue-state-interface

2016-09-29 1:15 GMT+02:00 Ken Krugler :

> Hi all,
>
> I’ve got a very specialized DB (runs in the JVM) that I need to use to
> both keep track of state and generate new records to be processed by my
> Flink streaming workflow. Some of the workflow results are updates to be
> applied to the DB.
>
> And the DB needs to be partitioned.
>
> My initial approach is to wrap it in a regular operator, and have
> subsequent streams be inputs for updating state. So now I’ve got an
> IterativeDataStream, which should work.
>
> But I imagine I could also wrap this DB in a source and a sink, yes?
> Though I’m not sure how I could partition it as a source, in that case.
>
> If it is feasible to have a partitioned source/sink, are there general
> pros/cons to either approach?
>
> Thanks,
>
> — Ken
>
>


Re: AW: Problem with CEPPatternOperator when taskmanager is killed

2016-09-29 Thread Fabian Hueske
Great, thanks!
I gave you contributor permissions in JIRA. You can now also assign issues
to yourself if you decide to continue to contribute.

Best, Fabian

2016-09-29 16:48 GMT+02:00 jaxbihani :

> Hi Fabian
>
> My JIRA user is: jaxbihani
> I have created a pull request for the fix :
> https://github.com/apache/flink/pull/2568
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Problem-with-
> CEPPatternOperator-when-taskmanager-is-killed-tp9024p9246.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: Events B33/35 in Parallel Streams Diagram

2016-09-29 Thread Fabian Hueske
Hi Neil,

"B" only refers to the key-part of the record, the number is the timestamp
(as you assumed out). The payload of the record is not displayed in the
figure. So B35 and B31 are two different records with identical key.
The keyBy() operation sends all records with the same key to the same
subtask.

Does that answer you question?

Best, Fabian

2016-09-29 17:22 GMT+02:00 Neil Derraugh <
neil.derra...@intellifylearning.com>:

> Hi,
>
> I’m confused about the meaning of event(s?) B33 and B35 in the Parallel
> Streams Diagram (https://ci.apache.org/projects/flink/flink-docs-
> master/dev/event_time.html#watermarks-in-parallel-streams). Why are there
> are two events with the same id on the diagram?  Is this supposed to
> represent an event emitted twice from the source with differing timestamps?
>
> Thanks,
> Neil
>


Re: Events B33/35 in Parallel Streams Diagram

2016-09-29 Thread Fabian Hueske
Sure, that would be great!
Thanks!

2016-09-29 17:43 GMT+02:00 Neil Derraugh <
neil.derra...@intellifylearning.com>:

> Hi Fabian,
>
> Yes.  Thanks!  I think it would be helpful to indicate that on the graph.
> Call it “key” or “key_id" instead of just “id”, as it is in fact the key of
> the stream and not the id of the event?  Probably seems trivial, but I
> struggled with this one. haha.  I’ll submit a PR for the docs if there’s
> interest.
>
> Neil
>
> On Sep 29, 2016, at 11:36 AM, Fabian Hueske  wrote:
>
> Hi Neil,
>
> "B" only refers to the key-part of the record, the number is the timestamp
> (as you assumed out). The payload of the record is not displayed in the
> figure. So B35 and B31 are two different records with identical key.
> The keyBy() operation sends all records with the same key to the same
> subtask.
>
> Does that answer you question?
>
> Best, Fabian
>
> 2016-09-29 17:22 GMT+02:00 Neil Derraugh  intellifylearning.com>:
>
>> Hi,
>>
>> I’m confused about the meaning of event(s?) B33 and B35 in the Parallel
>> Streams Diagram (https://ci.apache.org/project
>> s/flink/flink-docs-master/dev/event_time.html#watermarks-in-
>> parallel-streams). Why are there are two events with the same id on the
>> diagram?  Is this supposed to represent an event emitted twice from the
>> source with differing timestamps?
>>
>> Thanks,
>> Neil
>>
>
>
>


Re: Counting latest state of stateful entities in streaming

2016-09-30 Thread Fabian Hueske
Hi Simone,

I think I have a solution for your problem:

val s: DataStream[(Long, Int, ts)] = ??? // (id, state, time)

val stateChanges: DataStream[(Int, Int)] = s // (state, cntUpdate)
  .keyBy(_._1) // key by id
  .flatMap(new StateUpdater) // StateUpdater is a stateful FlatMapFunction.
It has a keyed state that stores the last state of each id. For each input
record it returns two records: (oldState, -1), (newState, +1)

stateChanges ensures that counts of previous states are subtracted.

val changesPerWindow: DataStream[(Int, Int, Long)] = stateChanges //
(state, cntUpdate, time)
  .keyBy(_._1) // key by state
  .window() // your window, should be non-overlapping, so go for instance
for Tumbling
  .apply(new SumReducer(), new YourWindowFunction()) // SumReducer sums the
cntUpdates and YourWindowFunction assigns the timestamp of your window

this step aggregates all state changes for each state in a window

val stateCnts: DataStream[(Int, Int, Long)] = stateCnts (state, count, time)
  .keyBy(_._1) // key by state again
  .map(new CountUpdater) // CountUpdater is a stateful MapFunction. I has a
keyed state that stores the current count. For each incoming record, the
count is adjusted and a record (state, newCount, time) is emitted.

Now you have the new counts for your states in multiple records. If
possible, you can update your Elasticsearch index using these. Otherwise,
you have to collect them into one record using another window.

Also note, that the state size of this program depends on the number of
unique ids. That might cause problems if the id space grows very fast.

Please let me know, if you have questions or if that works ;-)

Cheers, Fabian

2016-09-30 0:32 GMT+02:00 Simone Robutti :

> Hello,
>
> in the last few days I tried to create my first real-time analytics job in
> Flink. The approach is kappa-architecture-like, so I have my raw data on
> Kafka where we receive a message for every change of state of any entity.
>
> So the messages are of the form
>
> (id,newStatus, timestamp)
>
> We want to compute, for every time window, the count of items in a given
> status. So the output should be of the form
>
> (outputTimestamp, state1:count1,state2:count2 ...)
>
> or equivalent. These rows should contain, at any given time, the count of
> the items in a given status, where the status associated to an Id is the
> most recent message observed for that id. The status for an id should be
> counted in any case, even if the event is way older than those getting
> processed. So the sum of all the counts should be equal to the number of
> different IDs observed in the system. The following step could be
> forgetting about the items in a final item after a while, but this is not a
> strict requirement right now.
>
> This will be written on elasticsearch and then queried.
>
> I tried many different paths and none of them completely satisfied the
> requirement. Using a sliding window I could easily achieve the expected
> behaviour, except that when the beginning of the sliding window surpassed
> the timestamp of an event, it was lost for the count, as you may expect.
> Others approaches failed to be consistent when working with a backlog
> because I did some tricks with keys and timestamps that failed when the
> data was processed all at once.
>
> So I would like to know, even at an high level, how should I approach this
> problem. It looks like a relatively common use-case but the fact that the
> relevant information for a given ID must be retained indefinitely to count
> the entities correctly creates a lot of problems.
>
> Thank you in advance,
>
> Simone
>
>


Re: Counting latest state of stateful entities in streaming

2016-09-30 Thread Fabian Hueske
This works with event-time as well. You need to set the right
TimeCharacteristics on the exec env and assign timestamps + watermarks. The
only time depended operation is the window. YourWindowFunction assigns the
timestamp of the window. WindowFunction.apply() has a TimeWindow parameter
that gives access to the window's start and end time. See docs [1].

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/windows.html#windowfunction-with-incremental-aggregation

2016-09-30 11:00 GMT+02:00 Simone Robutti :

> I'm working with your suggestions, thank you very much. What I'm missing
> here is what YourWindowFunction should do. I have no notion of event time
> there and so I can't assign a timestamp. Also this solution seems to be
> working by processing time, while I care about event time. I couldn't make
> it run yet but for what I got, this is slightly different from what I need.
>
> 2016-09-30 10:04 GMT+02:00 Fabian Hueske :
>
>> Hi Simone,
>>
>> I think I have a solution for your problem:
>>
>> val s: DataStream[(Long, Int, ts)] = ??? // (id, state, time)
>>
>> val stateChanges: DataStream[(Int, Int)] = s // (state, cntUpdate)
>>   .keyBy(_._1) // key by id
>>   .flatMap(new StateUpdater) // StateUpdater is a stateful
>> FlatMapFunction. It has a keyed state that stores the last state of each
>> id. For each input record it returns two records: (oldState, -1),
>> (newState, +1)
>>
>> stateChanges ensures that counts of previous states are subtracted.
>>
>> val changesPerWindow: DataStream[(Int, Int, Long)] = stateChanges //
>> (state, cntUpdate, time)
>>   .keyBy(_._1) // key by state
>>   .window() // your window, should be non-overlapping, so go for instance
>> for Tumbling
>>   .apply(new SumReducer(), new YourWindowFunction()) // SumReducer sums
>> the cntUpdates and YourWindowFunction assigns the timestamp of your window
>>
>> this step aggregates all state changes for each state in a window
>>
>> val stateCnts: DataStream[(Int, Int, Long)] = stateCnts (state, count,
>> time)
>>   .keyBy(_._1) // key by state again
>>   .map(new CountUpdater) // CountUpdater is a stateful MapFunction. I has
>> a keyed state that stores the current count. For each incoming record, the
>> count is adjusted and a record (state, newCount, time) is emitted.
>>
>> Now you have the new counts for your states in multiple records. If
>> possible, you can update your Elasticsearch index using these. Otherwise,
>> you have to collect them into one record using another window.
>>
>> Also note, that the state size of this program depends on the number of
>> unique ids. That might cause problems if the id space grows very fast.
>>
>> Please let me know, if you have questions or if that works ;-)
>>
>> Cheers, Fabian
>>
>>
>> 2016-09-30 0:32 GMT+02:00 Simone Robutti :
>>
>>> Hello,
>>>
>>> in the last few days I tried to create my first real-time analytics job
>>> in Flink. The approach is kappa-architecture-like, so I have my raw data on
>>> Kafka where we receive a message for every change of state of any entity.
>>>
>>> So the messages are of the form
>>>
>>> (id,newStatus, timestamp)
>>>
>>> We want to compute, for every time window, the count of items in a given
>>> status. So the output should be of the form
>>>
>>> (outputTimestamp, state1:count1,state2:count2 ...)
>>>
>>> or equivalent. These rows should contain, at any given time, the count
>>> of the items in a given status, where the status associated to an Id is the
>>> most recent message observed for that id. The status for an id should be
>>> counted in any case, even if the event is way older than those getting
>>> processed. So the sum of all the counts should be equal to the number of
>>> different IDs observed in the system. The following step could be
>>> forgetting about the items in a final item after a while, but this is not a
>>> strict requirement right now.
>>>
>>> This will be written on elasticsearch and then queried.
>>>
>>> I tried many different paths and none of them completely satisfied the
>>> requirement. Using a sliding window I could easily achieve the expected
>>> behaviour, except that when the beginning of the sliding window surpassed
>>> the timestamp of an event, it was lost for the count, as you may expect.
>>> Others approaches failed to be consistent when working with a backlog
>>> because I did some tricks with keys and timestamps that failed when the
>>> data was processed all at once.
>>>
>>> So I would like to know, even at an high level, how should I approach
>>> this problem. It looks like a relatively common use-case but the fact that
>>> the relevant information for a given ID must be retained indefinitely to
>>> count the entities correctly creates a lot of problems.
>>>
>>> Thank you in advance,
>>>
>>> Simone
>>>
>>>
>>
>


Re: Using Flink and Cassandra with Scala

2016-10-04 Thread Fabian Hueske
FYI: FLINK-4497 [1] requests Scala tuple and case class support for the
Cassandra sink and was opened about a month ago.

[1] https://issues.apache.org/jira/browse/FLINK-4497

2016-09-30 23:14 GMT+02:00 Stephan Ewen :

> How hard would it be to add case class support?
>
> Internally, tuples and case classes are treated quite similar, so I think
> it could be a quite simple extension...
>
> On Fri, Sep 30, 2016 at 4:22 PM, Sanne de Roever <
> sanne.de.roe...@gmail.com> wrote:
>
>> Thanks Chesnay. Have a good weekend.
>>
>> On Thu, Sep 29, 2016 at 5:03 PM, Chesnay Schepler 
>> wrote:
>>
>>> the cassandra sink only supports java tuples and POJO's.
>>>
>>>
>>> On 29.09.2016 16:33, Sanne de Roever wrote:
>>>
 Hi,

 Does the Cassandra sink support Scala and case classes? It looks like
 using Java is at the moment best practice.

 Cheers,

 Sanne

>>>
>>>
>>
>


Re: Enriching a tuple mapped from a datastream with data coming from a JDBC source

2016-10-04 Thread Fabian Hueske
Hi Philipp,

If I got your requirements right you would like to:
1) load an initial hashmap via JDBC
2) update the hashmap from a stream
3) use the hashmap to enrich another stream.

You can use a CoFlatMap to do this:

stream1.connect(stream2).flatMap(new YourCoFlatMapFunction).

YourCoFlatMapFunction should implement RichCoFlatMapFunction. The initial
hashmap load can be done in the open method.
A CoFlatMapFunction has two inputs and two flatMap methods, one for each
input. One method can update the hashmap, the other enrich the second
stream with data from the hashmap.
Both methods are not concurrently called and the order in which they are
called depends on what data is available.

In general, it is not possible to share local operator state among
different operators (or even parallel instance of the same operator).

Hope this helps,
Fabian


Re: Presented Flink use case in Japan

2016-10-04 Thread Fabian Hueske
Thanks Hironori for sharing these excellent news!
Do you think it would be possible to add your use case to Flink's
Powered-By wiki page [1] ?

Thanks, Fabian

[1] https://cwiki.apache.org/confluence/display/FLINK/Powered+by+Flink

2016-10-04 14:04 GMT+02:00 Hironori Ogibayashi :

> Hello,
>
> Just for information.
>
> Last week, I have presented our Flink use case in my company's conference.
> (http://developers.linecorp.com/blog/?p=3992)
>
> Here is the slide.
> http://www.slideshare.net/linecorp/b-6-new-stream-processing-platformwith-
> apache-flink
> I think the video with English subtitle will also be published soon.
>
> The use case itself might not be very interesting, but I think this is the
> first Flink production use case in Japan opened to the public.
>
> Thank you for great software.
>
> Regards,
> Hironori Ogibayashi
>


Re: Handling decompression exceptions

2016-10-04 Thread Fabian Hueske
Hi Yassine,

AFAIK, there is no built-in way to ignore corrupted compressed files.
You could try to implement a FileInputFormat that wraps the CsvInputFormat
and forwards all calls to the wrapped CsvIF.
The wrapper would also catch and ignore the EOFException.

If you do that, you would not be able to use the env.readCsvFile() shortcut
but would need to create an instance of your own InputFormat and add it
with
env.readFile(yourIF).

Hope this helps,
Fabian

2016-10-04 17:43 GMT+02:00 Yassine MARZOUGUI :

> Hi all,
>
> I am reading a large number of GZip compressed csv files, nested in a HDFS
> directory:
>
> Configuration parameters = new Configuration();
> parameters.setBoolean("recursive.file.enumeration", true);
> DataSet> hist = env.readCsvFile("hdfs:///
> shared/logs/")
> .ignoreFirstLine()
> .fieldDelimiter("|")
> .includeFields("011000")
> .types(String.class, Long.class)
> .withParameters(parameters);
>
> My job is failing with the following exception:
>
> 2016-10-04 17:19:59,933 INFO  org.apache.flink.runtime.jobmanager.JobManager  
>   - Status of job 66fce11277a4df6aa48dee636a81 (HDFSTest) 
> changed to FAILING.
>
> java.io.EOFException: Unexpected end of ZLIB input stream
>
>   at java.util.zip.InflaterInputStream.fill(Unknown Source)
>
>   at java.util.zip.InflaterInputStream.read(Unknown Source)
>
>   at java.util.zip.GZIPInputStream.read(Unknown Source)
>
>   at 
> org.apache.flink.api.common.io.InputStreamFSInputWrapper.read(InputStreamFSInputWrapper.java:75)
>
>   at 
> org.apache.flink.api.common.io.DelimitedInputFormat.fillBuffer(DelimitedInputFormat.java:591)
>
>   at 
> org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:513)
>
>   at 
> org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:479)
>
>   at 
> org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:78)
>
>   at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:162)
>
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608)
>
>   at java.lang.Thread.run(Unknown Source)
>
> I think it is due to some unproperly compressed files, how can I handle and 
> ignore such exceptions? Thanks.
>
>
> Best,
> Yassine
>
>


Re: Processing events through web socket

2016-10-05 Thread Fabian Hueske
Hi,

the TextSocketSink is rather meant for demo purposes than to be used in an
actual applications.
I am not aware of any other built-in source that would provide what you are
looking for.
You can implement a custom SourceFunction that does what you need.

Best, Fabian

2016-10-05 9:48 GMT+02:00 Abdul Salam Shaikh :

> I am trying to read data over a websocket from a source(For example wss://
> brisbane.unipulse.com.au:443/OutboundInterfaces/outbound/statements). But
> at the moment the implementation of env.socketTextStream only supports
> plain TCP and has no support for TLS or higher level protocols​. Is there
> any other wrapper/third party API implementation which I can use on top of
> flink to act as a streaming service before transmitting it to flink ?
>
> --
> Thank
> ​s.​
>
>


Re: Presented Flink use case in Japan

2016-10-05 Thread Fabian Hueske
Hi Hironori,

thanks for getting the approval!
Your sentence is perfect.
I've add it to the Wiki page [1].

Best, Fabian

[1] https://cwiki.apache.org/confluence/display/FLINK/Powered+by+Flink

2016-10-05 14:31 GMT+02:00 Hironori Ogibayashi :

> Hi.
>
> Yes, I am really looking forward to the next major release.
>
> By the way, I got OK from our PR department about adding our company
> name to the Powered-By page.
> Do I need to provide some information?
>
> Maybe the line will be:
> LINE(https://line.me/en/) uses Apache Flink for real-time log
> aggregation and system monitoring.
> or something like that?
>
> Regards,
> Hironori
>
> 2016-10-05 18:14 GMT+09:00 Stephan Ewen :
> > Hi!
> >
> > Thanks for sharing this.
> >
> > I am happy to say that many of the "weak point" issues are actually being
> > addressed right now, for the next major release, such as:
> >
> >   - Change of parallelism via savepoints
> >   - Compatibility of savepoints across versions
> >
> > Greetings,
> > Stephan
> >
> >
> > On Tue, Oct 4, 2016 at 11:56 PM, Hironori Ogibayashi <
> ogibaya...@gmail.com>
> > wrote:
> >>
> >> Thank you for the response.
> >> Regarding adding to the page, I will check with our PR department.
> >>
> >> Regards,
> >> Hironori
> >>
> >> 2016/10/04 21:12、Fabian Hueske  のメッセージ:
> >>
> >> Thanks Hironori for sharing these excellent news!
> >> Do you think it would be possible to add your use case to Flink's
> >> Powered-By wiki page [1] ?
> >>
> >> Thanks, Fabian
> >>
> >> [1] https://cwiki.apache.org/confluence/display/FLINK/Powered+by+Flink
> >>
> >> 2016-10-04 14:04 GMT+02:00 Hironori Ogibayashi :
> >>>
> >>> Hello,
> >>>
> >>> Just for information.
> >>>
> >>> Last week, I have presented our Flink use case in my company's
> >>> conference.
> >>> (http://developers.linecorp.com/blog/?p=3992)
> >>>
> >>> Here is the slide.
> >>>
> >>> http://www.slideshare.net/linecorp/b-6-new-stream-
> processing-platformwith-apache-flink
> >>> I think the video with English subtitle will also be published soon.
> >>>
> >>> The use case itself might not be very interesting, but I think this is
> >>> the
> >>> first Flink production use case in Japan opened to the public.
> >>>
> >>> Thank you for great software.
> >>>
> >>> Regards,
> >>> Hironori Ogibayashi
> >>
> >>
> >
>


Re: readCsvFile

2016-10-06 Thread Fabian Hueske
Hi Alberto,

if you want to read a single column you have to wrap it in a Tuple1:

val text4 = env.readCsvFile[Tuple1[String]]("file:data.csv"
,includedFields = Array(1))

Best, Fabian

2016-10-06 20:59 GMT+02:00 Alberto Ramón :

> I'm learning readCsvFile
> (I discover if the file ends on "/n", you will return a null exception)
>
> *if I try to read only 1 column *
>
> val text4 = env.readCsvFile[String]("file:data.csv" ,includedFields = 
> Array(1))
>
> The error is: he type String has to be a tuple or pojo type. [null]
>
>
>
>
> *If  I put > 1 column; (*1º and 2º in this case*)*
>
> val text4 = env.readCsvFile [(String,String)]("data.csv"
>   ,fieldDelimiter = ","
>   ,includedFields = Array(0,1))
>
> Read all columns from, CSV (3 in my example)
>
>
>
>


Re: Csv data stream

2016-10-06 Thread Fabian Hueske
Hi,

how are you executing your code? From an IDE or on a running Flink instance?
If you execute it on a running Flink instance, you have to look into the
.out files of the task managers (located in ./log/).

Best, Fabian


2016-10-06 22:08 GMT+02:00 drystan mazur :

> Hello I am reading a csv file with flink 1.1.2 the file loads and runs but
> printi= ng shows nothing ? TupleCsvInputFormat oilDataIn;
> TupleTypeInfo String,String,String>> oildataTypes; BasicTypeInfo[] types =3D
> {BasicTypeInfo.STRING_TYPE_INFO,BasicTypeIn= fo.STRING_TYPE_INFO,
> BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_
> INFO,BasicTypeInfo.STRING_TYPE_INFO= ,BasicTypeInfo.STRING_TYPE_INFO,
> BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO=
> ,BasicTypeInfo.STRING_TYPE_INFO}; oildataTypes =3D new
> TupleTypeInfo<>(types); oilDataIn =3D new 
> TupleCsvInputFormat<>(oilPath,"\n",",",oildataTypes=
> ); oilDataStream =3D env.createInput(oilDataIn,new TupleTypeInfo(Tuple9.=
> class, types)); oilDataStream.print(); env.execute("Flink Java API
> Skeleton"); The code runs ok I just wanted to view the datastream what I
> am doing wrong= ? Thanks
> --
> View this message in context: Csv data stream
> 
> Sent from the Apache Flink User Mailing List archive. mailing list archive
>  at
> Nabble.com.
>


Re: DataStream csv reading

2016-10-06 Thread Fabian Hueske
Hi Greg,

print is only eagerly executed for DataSet programs.
In the DataStream API, print() just appends a print sink and execute() is
required to trigger an execution.


2016-10-06 22:40 GMT+02:00 Greg Hogan :

> The program executes when you call print (same for collect), which is why
> you are seeing an error when calling execute (since there is no new job to
> execute). As Fabian noted, you'll need to look in the TaskManager log files
> for the printed output if running on a cluster.
>
> On Thu, Oct 6, 2016 at 4:21 PM, drystan mazur 
> wrote:
>
>> Hello I am reading a csv file with flink 1.1.2 the file loads and runs
>> but printing shows nothing ?
>>
>> 
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>> env.setParallelism(1);
>> System.out.println(env);
>>
>> DataStream> String,String,String,
>> String,String,String>> oilDataStream;
>> DataStreamSink oildataSink;
>> //String path = 
>> Paths.get(".").toAbsolutePath().normalize().toString();
>> String path = "/quickstart/test_data/oil_price.csv";
>>
>>
>> Path oilPath = new Path(path);
>>
>>
>> TupleCsvInputFormat oilDataIn;
>>
>> TupleTypeInfo> String,String,String,
>> String,String,String>> oildataTypes;
>>
>> BasicTypeInfo[] types = 
>> {BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,
>> 
>> BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,
>> 
>> BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO};
>>
>> oildataTypes = new TupleTypeInfo<>(types);
>> oilDataIn = new TupleCsvInputFormat<>(oilPath,"\n",",",oildataTypes);
>>
>>
>> oilDataStream = env.createInput(oilDataIn,new 
>> TupleTypeInfo(Tuple9.class, types));
>> oilDataStream.print();
>>
>>
>>
>>
>>  env.execute("Flink Java API Skeleton");
>>
>>
>> 
>>
>> The code runs ok I just wanted to view the datastream what I am doing
>> wrong ? Thanks
>> --
>> View this message in context: DataStream csv reading
>> 
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive
>> 
>> at Nabble.com.
>>
>
>


Re: Merging N parallel/partitioned WindowedStreams together, one-to-one, into a global window stream

2016-10-06 Thread Fabian Hueske
Maybe this can be done by assigning the same window id to each of the N
local windows, and do a

.keyBy(windowId)
.countWindow(N)

This should create a new global window for each window id and collect all N
windows.

Best, Fabian

2016-10-06 22:39 GMT+02:00 AJ Heller :

> The goal is:
>  * to split data, random-uniformly, across N nodes,
>  * window the data identically on each node,
>  * transform the windows locally on each node, and
>  * merge the N parallel windows into a global window stream, such that one
> window from each parallel process is merged into a "global window" aggregate
>
> I've achieved all but the last bullet point, merging one window from each
> partition into a globally-aggregated window output stream.
>
> To be clear, a rolling reduce won't work because it would aggregate over
> all previous windows in all partitioned streams, and I only need to
> aggregate over one window from each partition at a time.
>
> Similarly for a fold.
>
> The closest I have found is ParallelMerge for ConnectedStreams, but I have
> not found a way to apply it to this problem. Can flink achieve this? If so,
> I'd greatly appreciate a point in the right direction.
>
> Cheers,
> -aj
>


Re: Merging N parallel/partitioned WindowedStreams together, one-to-one, into a global window stream

2016-10-07 Thread Fabian Hueske
If you are using time windows, you can access the TimeWindow parameter of
the WindowFunction.apply() method.
The TimeWindow contains the start and end timestamp of a window (as Long)
which can act as keys.

If you are using count windows, I think you have to use a counter as you
described.


2016-10-07 1:06 GMT+02:00 AJ Heller :

> Thank you Fabian, I think that solves it. I'll need to rig up some tests
> to verify, but it looks good.
>
> I used a RichMapFunction to assign ids incrementally to windows (mapping
> STREAM_OBJECT to Tuple2 using a private long value in
> the mapper that increments on every map call). It works, but by any chance
> is there a more succinct way to do it?
>
> On Thu, Oct 6, 2016 at 1:50 PM, Fabian Hueske  wrote:
>
>> Maybe this can be done by assigning the same window id to each of the N
>> local windows, and do a
>>
>> .keyBy(windowId)
>> .countWindow(N)
>>
>> This should create a new global window for each window id and collect all
>> N windows.
>>
>> Best, Fabian
>>
>> 2016-10-06 22:39 GMT+02:00 AJ Heller :
>>
>>> The goal is:
>>>  * to split data, random-uniformly, across N nodes,
>>>  * window the data identically on each node,
>>>  * transform the windows locally on each node, and
>>>  * merge the N parallel windows into a global window stream, such that
>>> one window from each parallel process is merged into a "global window"
>>> aggregate
>>>
>>> I've achieved all but the last bullet point, merging one window from
>>> each partition into a globally-aggregated window output stream.
>>>
>>> To be clear, a rolling reduce won't work because it would aggregate over
>>> all previous windows in all partitioned streams, and I only need to
>>> aggregate over one window from each partition at a time.
>>>
>>> Similarly for a fold.
>>>
>>> The closest I have found is ParallelMerge for ConnectedStreams, but I
>>> have not found a way to apply it to this problem. Can flink achieve this?
>>> If so, I'd greatly appreciate a point in the right direction.
>>>
>>> Cheers,
>>> -aj
>>>
>>
>>
>


Re: jdbc.JDBCInputFormat

2016-10-07 Thread Fabian Hueske
As the exception says the class
org.apache.flink.api.scala.io.jdbc.JDBCInputFormat does not exist.

You have to do:

import org.apache.flink.api.java.io.jdbc.JDBCInputFormat

There is no Scala implementation of this class but you can also use Java
classes in Scala.

2016-10-07 21:38 GMT+02:00 Alberto Ramón :

>
> I want use CreateInput + buildJDBCInputFormat to acces to database on SCALA
>
> PB1:
>
> import org.apache.flink.api.scala.io.jdbc.JDBCInputFormat
> Error:(25, 37) object jdbc is not a member of package 
> org.apache.flink.api.java.io
> import org.apache.flink.api.java.io.jdbc.JDBCInputFormat
>
> Then, I can't use:
> [image: Imágenes integradas 1]
>
> I tried to download code from git and recompile, also
>
>


Re: readCsvFile

2016-10-07 Thread Fabian Hueske
I would check that the field delimiter is correctly set.

With the correct delimiter your code would give

((a),1)
((aa),1)

because the single field is wrapped in a Tuple1.
You have to unwrap it in the map function: .map { (_._1, 1) }

2016-10-07 18:08 GMT+02:00 Alberto Ramón :

> Humm
>
> Your solution compile with out errors, but IncludedFields Isn't working:
> [image: Imágenes integradas 1]
>
> The output is incorrect:
> [image: Imágenes integradas 2]
>
> The correct result must be only 1º Column
> (a,1)
> (aa,1)
>
> 2016-10-06 21:37 GMT+02:00 Fabian Hueske :
>
>> Hi Alberto,
>>
>> if you want to read a single column you have to wrap it in a Tuple1:
>>
>> val text4 = env.readCsvFile[Tuple1[String]]("file:data.csv" ,includedFields 
>> = Array(1))
>>
>> Best, Fabian
>>
>> 2016-10-06 20:59 GMT+02:00 Alberto Ramón :
>>
>>> I'm learning readCsvFile
>>> (I discover if the file ends on "/n", you will return a null exception)
>>>
>>> *if I try to read only 1 column *
>>>
>>> val text4 = env.readCsvFile[String]("file:data.csv" ,includedFields = 
>>> Array(1))
>>>
>>> The error is: he type String has to be a tuple or pojo type. [null]
>>>
>>>
>>>
>>>
>>> *If  I put > 1 column; (*1º and 2º in this case*)*
>>>
>>> val text4 = env.readCsvFile [(String,String)]("data.csv"
>>>   ,fieldDelimiter = ","
>>>   ,includedFields = Array(0,1))
>>>
>>> Read all columns from, CSV (3 in my example)
>>>
>>>
>>>
>>>
>>
>


<    1   2   3   4   5   6   7   8   9   10   >