Yahoo! Streaming Benchmark with Flink

2016-10-27 Thread Eric Fukuda
Hi,

I have two questions on the blog post on Yahoo! Streaming Benchmark with
Flink [1].

First is about the join operation to associate ad_ids and campaign_ids. In
flink.benchmark.state.AdvertisingTopologyFlinkStateHighKeyCard, I don't see
this being done. Is there a reason for this?

Second is about Akka actor. Reading
flink.benchmark.state.QueryableWindowOperator or
flink.benchmark.state.QueryableWindowOperatorEvicting, it looks like the
Akka actor is being prepared but not used in the actual processing
(processElement()). Is this correct? And how do I enable Akka in the job?

[1] http://data-artisans.com/extending-the-yahoo-streaming-benchmark/

Thanks,
Eric


Re: FlinkKafkaConsumerBase - Received confirmation for unknown checkpoint

2016-10-27 Thread Stefan Richter
Hi,

I tracked down the problem and have a fix in this PR 
https://github.com/apache/flink/pull/2706 
 . Besides the misleading warning, 
the code should also still behave correctly in the old version.

Best,
Stefan

> Am 27.10.2016 um 17:25 schrieb Robert Metzger :
> 
> Hi,
> it would be nice if you could check with a stable version as well.
> 
> Thank you!
> 
> On Thu, Oct 27, 2016 at 9:58 AM, PedroMrChaves  > wrote:
> Hello,
> 
> I Am using the version 1.2-SNAPSHOT.
> I will try with a stable version to see if the problem persists.
> 
> Regards,
> Pedro Chaves.
> 
> 
> 
> --
> View this message in context: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/FlinkKafkaConsumerBase-Received-confirmation-for-unknown-checkpoint-tp9674p9749.html
>  
> 
> Sent from the Apache Flink User Mailing List archive. mailing list archive at 
> Nabble.com.
> 



Re: TIMESTAMP TypeInformation

2016-10-27 Thread Greg Hogan
Could be. I had thought TypeInfoParser was closely related to TypeExtractor.

On Thu, Oct 27, 2016 at 10:20 AM, Fabian Hueske  wrote:

> Wouldn't that be orthogonal to adding it to the TypeInfoParser?
>
> 2016-10-27 15:22 GMT+02:00 Greg Hogan :
>
>> Fabian,
>>
>> Should we instead add this as a registered TypeInfoFactory?
>>
>> Greg
>>
>> On Thu, Oct 27, 2016 at 3:55 AM, Fabian Hueske  wrote:
>>
>>> Yes, I think you are right.
>>> TypeInfoParser needs to be extended to parse the java.sql.* types into
>>> the corresponding TypeInfos.
>>>
>>> Can you open a JIRA for that?
>>>
>>> Thanks, Fabian
>>>
>>> 2016-10-27 9:31 GMT+02:00 Radu Tudoran :
>>>
 Hi,



 I dig meanwhile more through this and I think I found a bug actually.



 The scenario that I was trying to describe was something like

 1.   You have a generic datastream with Tuple (alternatively I
 could move to row I guess) and you get the data from whatever stream (e.g.,
 kafka, network socket…)

 2.   In the map/flat map function you parse and instantiate the
 tuple generically

 3.   In the “returns()” function of the map you enforce the types



 DataStream = env.socketTextStream(…)

 .map(new mapFunction(){

 Public Tuple map(String value){

 Tuple out  =
 Tuple.getTupleClass(#)

 …

 out.setField(SqlTimeTypeInfo.TIMESTAMP,0)

 …



 }}) .returns(“Tuple#”);





 The problem is that if you rely on the type extraction mechanism called
 after the returns to recognize TIMESTAMP of type SqlTimeTypeInfo it will
 not happen but instead a GenericType will be created.

 It looks like the type parsers were not extended to consider this types



 Dr. Radu Tudoran

 Senior Research Engineer - Big Data Expert

 IT R Division



 [image: cid:image007.jpg@01CD52EB.AD060EE0]

 HUAWEI TECHNOLOGIES Duesseldorf GmbH

 European Research Center

 Riesstrasse 25, 80992 München



 E-mail: *radu.tudo...@huawei.com *

 Mobile: +49 15209084330

 Telephone: +49 891588344173



 HUAWEI TECHNOLOGIES Duesseldorf GmbH
 Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
 Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
 Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
 Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
 Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN

 This e-mail and its attachments contain confidential information from
 HUAWEI, which is intended only for the person or entity whose address is
 listed above. Any use of the information contained herein in any way
 (including, but not limited to, total or partial disclosure, reproduction,
 or dissemination) by persons other than the intended recipient(s) is
 prohibited. If you receive this e-mail in error, please notify the sender
 by phone or email immediately and delete it!



 *From:* Fabian Hueske [mailto:fhue...@gmail.com]
 *Sent:* Wednesday, October 26, 2016 10:11 PM
 *To:* user@flink.apache.org
 *Subject:* Re: TIMESTAMP TypeInformation



 Hi Radu,

 I might not have complete understood your problem, but if you do

 val env = StreamExecutionEnvironment.getExecutionEnvironment
 val tEnv = TableEnvironment.getTableEnvironment(env)

 val ds = env.fromElements( (1, 1L, new Time(1,2,3)) )
 val t = ds.toTable(tEnv, 'a, 'b, 'c)

 val results = t
 .select('c + 10.seconds)

 then field 'c will be of type SqlTimeTypeInfo and handled as such.

 Hope this helps,

 Fabian



 2016-10-25 17:32 GMT+02:00 Radu Tudoran :

 Re-hi,



 I actually realized that the problem comes from the fact that the
 datastream that I am registering does not create properly the types.



 I am using something like



 DataStream … .returns(“TupleX<,….,java.sql.Timestamp,
 java.sql.Time>”)…and I was expecting that these will be converted to
 SqlTimeTypeInfo…but it is converted to GenericType. Anythoughts how I could
 force the type to be recognize as a SqlTimeType?





 *From:* Radu Tudoran
 *Sent:* Tuesday, October 25, 2016 4:46 PM
 *To:* 'user@flink.apache.org'
 *Subject:* TIMESTAMP TypeInformation



 Hi,



 I would like to create a 

Re: About stateful transformations

2016-10-27 Thread Juan Rodríguez Hortalá
Hi Aljoscha,

Thanks for your answer. At least by keeping only the latest one we don't
have retention problems with the state backend, and for now I guess we
could use manually triggered savepoints if we needed to store the history
of the state.

Thanks,

Juan

On Tue, Oct 25, 2016 at 6:58 AM, Aljoscha Krettek 
wrote:

> Hi,
> there is already a mechanism for that. Currently, Flink will only keep the
> most recent, successful checkpoint. We are currently working on making that
> configurable so that, for example, the last n successful checkpoints can be
> kept.
>
> Cheers,
> Aljoscha
>
> On Tue, 25 Oct 2016 at 06:47 Juan Rodríguez Hortalá <
> juan.rodriguez.hort...@gmail.com> wrote:
>
>> Hi Gyula,
>>
>> Thanks a lot for your response, it was very clear. I understand that
>> there is no problem of small files due to checkpointing not being
>> incremental. I also understand that each worker will interpret a file://
>> URL as local to its own file system, which works ok if all workers have a
>> remove file system mounted in the same local path.
>>
>> Now I have to check if Flink provides some expiration mechanism for old
>> checkpoints, although for S3 that is already available, and for HDFS I
>> guess some script that periodically deletes older files with hdfs dfs
>> -rmr should be easy enough. Is there any documentation about some naming
>> convention for checkpoint files that I could rely to delete old
>> checkpoints? E.g. some naming schema that uses dates, it would be nicer if
>> it was documented because then it would be more stable.
>>
>> Thanks again for your help.
>>
>> Greetings,
>>
>> Juan
>>
>>
>> On Mon, Oct 24, 2016 at 12:29 AM, Gyula Fóra  wrote:
>>
>> Hi Juan,
>>
>> Let me try to answer some of your questions :)
>>
>> We have been running Flink Streaming at King for quite some time now with
>> multiple jobs having several hundred gigabytes of KV state stored in
>> RocksDB. I would say RocksDB state backend is definitely the best choice at
>> the moment for large deployments as you can also keep the heap relatively
>> small to save some time on GC. But you have to play around with the rocks
>> configuration to get the most out of it depending on your hardware.
>>
>> I am not aware of any caching/TTL functionality exposed in the Flink APIs
>> currently. But if you are willing to dig a llittle deeper you could
>> implement a lower lever operator that uses timers like the windowing
>> mechanisms to clear state after some time.
>>
>> When you are selecting a checkpoint directory (URL) you need to make sure
>> that it is accessible from all the task managers. HDFS is convenient but is
>> not strictly necessary. We for instance use CEPH that is mounted as a
>> regular disk from the OS's perspective so we can use file:// and still save
>> to the distributed storage. As far as I know using yarn doesnt give much
>> benefit, I am not sure if Flink exploits any data locality at this moment.
>>
>> When you are running rocks db state backend there are two concepts you
>> need to think about for checkpointing. Your local rocks db directory, and
>> the checkpoint directory. The local directory is where the rocks instances
>> are created and they live on the taskmanagers local disk/memory. When Flink
>> takes a checkpoint a backup of all K-V pairs is copied as one blob to HDFS
>> or to the selected checkpoint directory. This means there is no data
>> fragmentation in the checkpoints. Similar applies to the FsStateBackend but
>> that keeps the local state strictly in memory.
>>
>> I think you should definitely give RocksDB + HDFS a try. It works
>> extremely well for very large state sizes given some tuning, but should
>> also perform out-of-the-box :)
>>
>> Cheers,
>> Gyula
>>
>> Juan Rodríguez Hortalá  ezt írta
>> (időpont: 2016. okt. 23., V, 22:29):
>>
>> Hi all,
>>
>> I don't have much experience with Flink, so please forget me if I ask
>> some obvious questions. I was taking a look to the documentation
>> on stateful transformations in Flink at https://ci.apache.org/
>> projects/flink/flink-docs-release-1.2/dev/state.html. I'm mostly
>> interested in Flink for stream processing, and I would like to know:
>>
>> - What is the biggest state that has been used in production deployments?
>> I'm interested in how many GB of state, among all key-value pairs, have
>> been used before in long running streaming jobs deployed in production.
>> Maybe some company has shared this in some conference or blog post. I guess
>> for that RocksDB backend is the best option for big states, to avoid being
>> limited by the available memory.
>>
>> - Is there any pre built functionality for state eviction? I'm thinking
>> of LRU cache-like behavior, with eviction based on time or size, similar to
>> Guava cache (https://github.com/google/guava/wiki/CachesExplained). This
>> is probably easy to implement anyway, by using the clear() primitive, but I
>> 

Re: Can we do batch writes on cassandra using flink while leveraging the locality?

2016-10-27 Thread Shannon Carey
It certainly seems possible to write a Partitioner that does what you describe. 
I started implementing one but didn't have time to finish it. I think the main 
difficulty is in properly dealing with partition ownership changes in 
Cassandra… if you are maintaining state in Flink and the partitioning changes, 
your job might produce inaccurate output. If, on the other hand, you are only 
using the partitioner just before the output, dynamic partitioning changes 
might be ok.


From: kant kodali >
Date: Thursday, October 27, 2016 at 3:17 AM
To: >
Subject: Can we do batch writes on cassandra using flink while leveraging the 
locality?

locality? For example the batch writes in Cassandra will put pressure on the 
coordinator but since the connectors are built by leveraging the locality I was 
wondering if we could do batch of writes on a node where the batch belongs?


Re: FlinkKafkaConsumerBase - Received confirmation for unknown checkpoint

2016-10-27 Thread Robert Metzger
Hi,
it would be nice if you could check with a stable version as well.

Thank you!

On Thu, Oct 27, 2016 at 9:58 AM, PedroMrChaves 
wrote:

> Hello,
>
> I Am using the version 1.2-SNAPSHOT.
> I will try with a stable version to see if the problem persists.
>
> Regards,
> Pedro Chaves.
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/FlinkKafkaConsumerBase-
> Received-confirmation-for-unknown-checkpoint-tp9674p9749.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: watermark trigger doesn't check whether element's timestamp is passed

2016-10-27 Thread Manu Zhang
Hi,

It's what I'm seeing. If timers are not fired at the end of window, a state
(in the window) whose timestamp is *after *the timer will also be emitted.
That's a problem for event-time trigger.

Thanks,
Manu


On Thu, Oct 27, 2016 at 10:29 PM Aljoscha Krettek 
wrote:

> Hi,
> is that example input/output what you would like to achieve or what you
> are currently seeing with Flink? I think for your use case a custom Trigger
> would be required that works like the event-time trigger but additionally
> registers timers for each element where you want to emit.
>
> Cheers,
> Aljoscha
>
> On Wed, 26 Oct 2016 at 04:04 Manu Zhang  wrote:
>
> Hi Aljoscha,
>
> Thanks for your response.  My use case is to track user trajectory based
> on page view event when they visit a website.  The input would be like a
> list of PageView(userId, url, eventTimestamp) with watermarks (=
> eventTimestamp - duration). I'm trying SessionWindows with some event time
> trigger. Note we can't wait for the end of session window due to latency.
> Instead, we want to emit the user trajectories whenever a buffered
> PageView's event time is passed by watermark. I tried
> ContinuousEventTimeTrigger and a custom trigger which sets timer on each
> element's timestamp. For both triggers I've witnessed a problem like the
> following (e.g. a session gap of 5)
>
> PageView("user1", "http://foo;, 1)
> PageView("user1", "http://foo/bar;, 2)
> Watermark(1) => emit UserTrajectory("user1", "http://foo -> *http://foo/bar
> *", [1,6])
> PageView("user1", "http://foo/bar/foobar;, 5)
> Watermark(4) => emit UserTrajectory("user1", "http://foo -> http://foo/bar ->
> *http://foo/bar/foobar *", [1, 10])
>
> The urls in bold should be included since there could be events before
> them not arrived yet.
>
>
> Thanks,
> Manu
>
>
> On Tue, Oct 25, 2016 at 1:36 AM Aljoscha Krettek 
> wrote:
>
> Hi,
> with some additional information we might be able to figure this out
> together. What specific combination of WindowAssigner/Trigger are you using
> for your example and what is the input stream (including watermarks)?
>
> Cheers,
> Aljoscha
>
> On Mon, 24 Oct 2016 at 06:30 Manu Zhang  wrote:
>
> Hi,
>
> Say I have a window state of List(("a", 1:00), ("b", 1:03), ("c", 1:06))
> which is triggered to emit when watermark passes the timestamp of an
> element. For example,
>
> on watermark(1:01), List(("a", 1:00)) is emitted
> on watermark(1:04), List(("a", 1:00), ("b", 1:03)) is emitted
> on watermark(1:07), List(("a", 1:00), ("b", 1:03), ("c", 1:06)) is emitted
>
> It seems that if *("c", 1:06) is processed before watermark(1:04)*
> List(("a", 1:00), ("b", 1:03), ("c", 1:06)) will be emitted on
> watermark(1:04). This is incorrect since there could be elements with
> timestamp between 1:04 and 1:06 that have not arrived yet.
>
> I guess this is because watermark trigger doesn't check whether element's
> timestamp has been passed.
>
> Please correct me if any of the above is not right.
>
> Thanks,
> Manu Zhang
>
>
>
>


Re: Testing iterative data flows

2016-10-27 Thread Ufuk Celebi
Hey Ken! Unfortunately, no. But Paris just posted a proposal to
improve this: 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-13-Consistent-Processing-with-Loops-tt14149.html

On Wed, Oct 26, 2016 at 11:10 PM, Ken Krugler
 wrote:
> Hi all,
>
> What’s the recommended way currently to test a streaming data flow that has
> iterations?
>
> I know that using timeouts in tests (which FLINK-2390 also discusses) isn’t
> reliable, and it’s hard to know when a job with iterations is really “done”
> in the context of a test.
>
> Are there any other approaches with current versions of Flink that would be
> better than an arbitrary timeout?
>
> Thanks,
>
> — Ken
>
> --
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com
> custom big data solutions & training
> Hadoop, Cascading, Cassandra & Solr
>


emit partial state in window (streaming)

2016-10-27 Thread Luis Mariano Guerra
hi,

 I need to calculate some counts for the day but also emit the partial
counts periodically, I think triggers may help me, I'm searching around but
there's not much content about it, any tip?

for example I'm counting access by location to different services, I want
to accumulate access during the whole day, but I want to emit the partial
count every 5 minutes.

one slightly related question, is there a way to align a window to a day?
for example a 24 hour window that starts at 00:00.

thanks.


Re: emit partial state in window (streaming)

2016-10-27 Thread Fabian Hueske
Hi Luis,

these blogposts should help you with the periodic partial result trigger
[1] [2].

Regarding the second question:
Time windows are by default aligned to 1970-01-01-00:00:00.
So a 24 hour window will always start at 00:00.

Best, Fabian

[1] http://flink.apache.org/news/2015/12/04/Introducing-windows.html
[2]
https://www.mapr.com/blog/essential-guide-streaming-first-processing-apache-flink

2016-10-27 16:31 GMT+02:00 Luis Mariano Guerra :

> hi,
>
>  I need to calculate some counts for the day but also emit the partial
> counts periodically, I think triggers may help me, I'm searching around but
> there's not much content about it, any tip?
>
> for example I'm counting access by location to different services, I want
> to accumulate access during the whole day, but I want to emit the partial
> count every 5 minutes.
>
> one slightly related question, is there a way to align a window to a day?
> for example a 24 hour window that starts at 00:00.
>
> thanks.
>


Re: watermark trigger doesn't check whether element's timestamp is passed

2016-10-27 Thread Aljoscha Krettek
Hi,
is that example input/output what you would like to achieve or what you are
currently seeing with Flink? I think for your use case a custom Trigger
would be required that works like the event-time trigger but additionally
registers timers for each element where you want to emit.

Cheers,
Aljoscha

On Wed, 26 Oct 2016 at 04:04 Manu Zhang  wrote:

> Hi Aljoscha,
>
> Thanks for your response.  My use case is to track user trajectory based
> on page view event when they visit a website.  The input would be like a
> list of PageView(userId, url, eventTimestamp) with watermarks (=
> eventTimestamp - duration). I'm trying SessionWindows with some event time
> trigger. Note we can't wait for the end of session window due to latency.
> Instead, we want to emit the user trajectories whenever a buffered
> PageView's event time is passed by watermark. I tried
> ContinuousEventTimeTrigger and a custom trigger which sets timer on each
> element's timestamp. For both triggers I've witnessed a problem like the
> following (e.g. a session gap of 5)
>
> PageView("user1", "http://foo;, 1)
> PageView("user1", "http://foo/bar;, 2)
> Watermark(1) => emit UserTrajectory("user1", "http://foo -> *http://foo/bar
> *", [1,6])
> PageView("user1", "http://foo/bar/foobar;, 5)
> Watermark(4) => emit UserTrajectory("user1", "http://foo -> http://foo/bar ->
> *http://foo/bar/foobar *", [1, 10])
>
> The urls in bold should be included since there could be events before
> them not arrived yet.
>
>
> Thanks,
> Manu
>
>
> On Tue, Oct 25, 2016 at 1:36 AM Aljoscha Krettek 
> wrote:
>
> Hi,
> with some additional information we might be able to figure this out
> together. What specific combination of WindowAssigner/Trigger are you using
> for your example and what is the input stream (including watermarks)?
>
> Cheers,
> Aljoscha
>
> On Mon, 24 Oct 2016 at 06:30 Manu Zhang  wrote:
>
> Hi,
>
> Say I have a window state of List(("a", 1:00), ("b", 1:03), ("c", 1:06))
> which is triggered to emit when watermark passes the timestamp of an
> element. For example,
>
> on watermark(1:01), List(("a", 1:00)) is emitted
> on watermark(1:04), List(("a", 1:00), ("b", 1:03)) is emitted
> on watermark(1:07), List(("a", 1:00), ("b", 1:03), ("c", 1:06)) is emitted
>
> It seems that if *("c", 1:06) is processed before watermark(1:04)*
> List(("a", 1:00), ("b", 1:03), ("c", 1:06)) will be emitted on
> watermark(1:04). This is incorrect since there could be elements with
> timestamp between 1:04 and 1:06 that have not arrived yet.
>
> I guess this is because watermark trigger doesn't check whether element's
> timestamp has been passed.
>
> Please correct me if any of the above is not right.
>
> Thanks,
> Manu Zhang
>
>
>
>


Re: TIMESTAMP TypeInformation

2016-10-27 Thread Fabian Hueske
Wouldn't that be orthogonal to adding it to the TypeInfoParser?

2016-10-27 15:22 GMT+02:00 Greg Hogan :

> Fabian,
>
> Should we instead add this as a registered TypeInfoFactory?
>
> Greg
>
> On Thu, Oct 27, 2016 at 3:55 AM, Fabian Hueske  wrote:
>
>> Yes, I think you are right.
>> TypeInfoParser needs to be extended to parse the java.sql.* types into
>> the corresponding TypeInfos.
>>
>> Can you open a JIRA for that?
>>
>> Thanks, Fabian
>>
>> 2016-10-27 9:31 GMT+02:00 Radu Tudoran :
>>
>>> Hi,
>>>
>>>
>>>
>>> I dig meanwhile more through this and I think I found a bug actually.
>>>
>>>
>>>
>>> The scenario that I was trying to describe was something like
>>>
>>> 1.   You have a generic datastream with Tuple (alternatively I
>>> could move to row I guess) and you get the data from whatever stream (e.g.,
>>> kafka, network socket…)
>>>
>>> 2.   In the map/flat map function you parse and instantiate the
>>> tuple generically
>>>
>>> 3.   In the “returns()” function of the map you enforce the types
>>>
>>>
>>>
>>> DataStream = env.socketTextStream(…)
>>>
>>> .map(new mapFunction(){
>>>
>>> Public Tuple map(String value){
>>>
>>> Tuple out  =
>>> Tuple.getTupleClass(#)
>>>
>>> …
>>>
>>> out.setField(SqlTimeTypeInfo.TIMESTAMP,0)
>>>
>>> …
>>>
>>>
>>>
>>> }}) .returns(“Tuple#”);
>>>
>>>
>>>
>>>
>>>
>>> The problem is that if you rely on the type extraction mechanism called
>>> after the returns to recognize TIMESTAMP of type SqlTimeTypeInfo it will
>>> not happen but instead a GenericType will be created.
>>>
>>> It looks like the type parsers were not extended to consider this types
>>>
>>>
>>>
>>> Dr. Radu Tudoran
>>>
>>> Senior Research Engineer - Big Data Expert
>>>
>>> IT R Division
>>>
>>>
>>>
>>> [image: cid:image007.jpg@01CD52EB.AD060EE0]
>>>
>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>>>
>>> European Research Center
>>>
>>> Riesstrasse 25, 80992 München
>>>
>>>
>>>
>>> E-mail: *radu.tudo...@huawei.com *
>>>
>>> Mobile: +49 15209084330
>>>
>>> Telephone: +49 891588344173
>>>
>>>
>>>
>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>>> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
>>> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
>>> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
>>> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
>>> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
>>>
>>> This e-mail and its attachments contain confidential information from
>>> HUAWEI, which is intended only for the person or entity whose address is
>>> listed above. Any use of the information contained herein in any way
>>> (including, but not limited to, total or partial disclosure, reproduction,
>>> or dissemination) by persons other than the intended recipient(s) is
>>> prohibited. If you receive this e-mail in error, please notify the sender
>>> by phone or email immediately and delete it!
>>>
>>>
>>>
>>> *From:* Fabian Hueske [mailto:fhue...@gmail.com]
>>> *Sent:* Wednesday, October 26, 2016 10:11 PM
>>> *To:* user@flink.apache.org
>>> *Subject:* Re: TIMESTAMP TypeInformation
>>>
>>>
>>>
>>> Hi Radu,
>>>
>>> I might not have complete understood your problem, but if you do
>>>
>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>> val tEnv = TableEnvironment.getTableEnvironment(env)
>>>
>>> val ds = env.fromElements( (1, 1L, new Time(1,2,3)) )
>>> val t = ds.toTable(tEnv, 'a, 'b, 'c)
>>>
>>> val results = t
>>> .select('c + 10.seconds)
>>>
>>> then field 'c will be of type SqlTimeTypeInfo and handled as such.
>>>
>>> Hope this helps,
>>>
>>> Fabian
>>>
>>>
>>>
>>> 2016-10-25 17:32 GMT+02:00 Radu Tudoran :
>>>
>>> Re-hi,
>>>
>>>
>>>
>>> I actually realized that the problem comes from the fact that the
>>> datastream that I am registering does not create properly the types.
>>>
>>>
>>>
>>> I am using something like
>>>
>>>
>>>
>>> DataStream … .returns(“TupleX<,….,java.sql.Timestamp,
>>> java.sql.Time>”)…and I was expecting that these will be converted to
>>> SqlTimeTypeInfo…but it is converted to GenericType. Anythoughts how I could
>>> force the type to be recognize as a SqlTimeType?
>>>
>>>
>>>
>>>
>>>
>>> *From:* Radu Tudoran
>>> *Sent:* Tuesday, October 25, 2016 4:46 PM
>>> *To:* 'user@flink.apache.org'
>>> *Subject:* TIMESTAMP TypeInformation
>>>
>>>
>>>
>>> Hi,
>>>
>>>
>>>
>>> I would like to create a TIMESTAMP type from the data schema. I would
>>> need this to match against the FlinkTypeFactory (toTypeInfo())
>>>
>>>
>>>
>>> *def* toTypeInfo(relDataType: RelDataType): TypeInformation[_] =
>>> relDataType.getSqlTypeName *match* {
>>>
>>> *case* BOOLEAN => BOOLEAN_TYPE_INFO
>>>
>>> *case* TINYINT => BYTE_TYPE_INFO
>>>
>>>   

Re: TIMESTAMP TypeInformation

2016-10-27 Thread Greg Hogan
Fabian,

Should we instead add this as a registered TypeInfoFactory?

Greg

On Thu, Oct 27, 2016 at 3:55 AM, Fabian Hueske  wrote:

> Yes, I think you are right.
> TypeInfoParser needs to be extended to parse the java.sql.* types into the
> corresponding TypeInfos.
>
> Can you open a JIRA for that?
>
> Thanks, Fabian
>
> 2016-10-27 9:31 GMT+02:00 Radu Tudoran :
>
>> Hi,
>>
>>
>>
>> I dig meanwhile more through this and I think I found a bug actually.
>>
>>
>>
>> The scenario that I was trying to describe was something like
>>
>> 1.   You have a generic datastream with Tuple (alternatively I could
>> move to row I guess) and you get the data from whatever stream (e.g.,
>> kafka, network socket…)
>>
>> 2.   In the map/flat map function you parse and instantiate the
>> tuple generically
>>
>> 3.   In the “returns()” function of the map you enforce the types
>>
>>
>>
>> DataStream = env.socketTextStream(…)
>>
>> .map(new mapFunction(){
>>
>> Public Tuple map(String value){
>>
>> Tuple out  =
>> Tuple.getTupleClass(#)
>>
>> …
>>
>> out.setField(SqlTimeTypeInfo.TIMESTAMP,0)
>>
>> …
>>
>>
>>
>> }}) .returns(“Tuple#”);
>>
>>
>>
>>
>>
>> The problem is that if you rely on the type extraction mechanism called
>> after the returns to recognize TIMESTAMP of type SqlTimeTypeInfo it will
>> not happen but instead a GenericType will be created.
>>
>> It looks like the type parsers were not extended to consider this types
>>
>>
>>
>> Dr. Radu Tudoran
>>
>> Senior Research Engineer - Big Data Expert
>>
>> IT R Division
>>
>>
>>
>> [image: cid:image007.jpg@01CD52EB.AD060EE0]
>>
>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>>
>> European Research Center
>>
>> Riesstrasse 25, 80992 München
>>
>>
>>
>> E-mail: *radu.tudo...@huawei.com *
>>
>> Mobile: +49 15209084330
>>
>> Telephone: +49 891588344173
>>
>>
>>
>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
>> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
>> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
>> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
>> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
>>
>> This e-mail and its attachments contain confidential information from
>> HUAWEI, which is intended only for the person or entity whose address is
>> listed above. Any use of the information contained herein in any way
>> (including, but not limited to, total or partial disclosure, reproduction,
>> or dissemination) by persons other than the intended recipient(s) is
>> prohibited. If you receive this e-mail in error, please notify the sender
>> by phone or email immediately and delete it!
>>
>>
>>
>> *From:* Fabian Hueske [mailto:fhue...@gmail.com]
>> *Sent:* Wednesday, October 26, 2016 10:11 PM
>> *To:* user@flink.apache.org
>> *Subject:* Re: TIMESTAMP TypeInformation
>>
>>
>>
>> Hi Radu,
>>
>> I might not have complete understood your problem, but if you do
>>
>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>> val tEnv = TableEnvironment.getTableEnvironment(env)
>>
>> val ds = env.fromElements( (1, 1L, new Time(1,2,3)) )
>> val t = ds.toTable(tEnv, 'a, 'b, 'c)
>>
>> val results = t
>> .select('c + 10.seconds)
>>
>> then field 'c will be of type SqlTimeTypeInfo and handled as such.
>>
>> Hope this helps,
>>
>> Fabian
>>
>>
>>
>> 2016-10-25 17:32 GMT+02:00 Radu Tudoran :
>>
>> Re-hi,
>>
>>
>>
>> I actually realized that the problem comes from the fact that the
>> datastream that I am registering does not create properly the types.
>>
>>
>>
>> I am using something like
>>
>>
>>
>> DataStream … .returns(“TupleX<,….,java.sql.Timestamp,
>> java.sql.Time>”)…and I was expecting that these will be converted to
>> SqlTimeTypeInfo…but it is converted to GenericType. Anythoughts how I could
>> force the type to be recognize as a SqlTimeType?
>>
>>
>>
>>
>>
>> *From:* Radu Tudoran
>> *Sent:* Tuesday, October 25, 2016 4:46 PM
>> *To:* 'user@flink.apache.org'
>> *Subject:* TIMESTAMP TypeInformation
>>
>>
>>
>> Hi,
>>
>>
>>
>> I would like to create a TIMESTAMP type from the data schema. I would
>> need this to match against the FlinkTypeFactory (toTypeInfo())
>>
>>
>>
>> *def* toTypeInfo(relDataType: RelDataType): TypeInformation[_] =
>> relDataType.getSqlTypeName *match* {
>>
>> *case* BOOLEAN => BOOLEAN_TYPE_INFO
>>
>> *case* TINYINT => BYTE_TYPE_INFO
>>
>> *case* SMALLINT => SHORT_TYPE_INFO
>>
>> *case* INTEGER => INT_TYPE_INFO
>>
>> *case* BIGINT => LONG_TYPE_INFO
>>
>> *case* FLOAT => FLOAT_TYPE_INFO
>>
>> *case* DOUBLE => DOUBLE_TYPE_INFO
>>
>> *case* VARCHAR | CHAR => STRING_TYPE_INFO
>>
>> *case* DECIMAL => BIG_DEC_TYPE_INFO
>>
>>
>>
>>   

Re: Broadcast Config-Values through connected Configuration Stream

2016-10-27 Thread Aljoscha Krettek
Hi,
yes it can be done, in fact I have code like this in the Beam-on-Flink
runner:

// we have to manually contruct the two-input transform because we're not
// allowed to have only one input keyed, normally.

TwoInputTransformation<
WindowedValue>,
RawUnionValue,
WindowedValue>> rawFlinkTransform = new
TwoInputTransformation<>(
keyedWorkItemStream.getTransformation(),
transformSideInputs.f1.broadcast().getTransformation(),
transform.getName(),
(TwoInputStreamOperator) doFnOperator,
outputTypeInfo,
keyedWorkItemStream.getParallelism());

rawFlinkTransform.setStateKeyType(keyedWorkItemStream.getKeyType());
rawFlinkTransform.setStateKeySelectors(keyedWorkItemStream.getKeySelector(),
null);

@SuppressWarnings({ "unchecked", "rawtypes" })
SingleOutputStreamOperator>> outDataStream =
new SingleOutputStreamOperator(
keyedWorkItemStream.getExecutionEnvironment(),
rawFlinkTransform) {}; // we have to cheat around the ctor being
protected

keyedWorkItemStream.getExecutionEnvironment().addOperator(rawFlinkTransform);

where I manually create a TwoInputTransformation and a
SIngleOutputStreamOperator. I would absolutely advise against doing that,
however.

Using the Checkpointed interface works but will lead to a program that
cannot be rescaled, i.e. the parallelism cannot be changed once we have
that feature in Flink 1.2.

Cheers,
Aljoscha

On Thu, 27 Oct 2016 at 12:15 Gyula Fóra  wrote:

> Hi,
>
> I agree with Aljoscha that needs to be solved properly, but it is
> technically possible to do it now as well (he actually had a PR a while
> back doing this.)
>
> You need to manually change how the transform method works on the
> connected stream to allow setting the key only one input. You need to use
> some reflection magic though to create the output operator if you dont want
> to recompile your own Flink version but it's definitely doable. (I use this
> technique in several of my production jobs)
>
>
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java#L234
>
> As for fault-tolerance you need to make sure to checkpoint the broadcasted
> state using the Checkpointed interface.
>
> Cheers,
> Gyula
>
>
> Aljoscha Krettek  ezt írta (időpont: 2016. okt. 27.,
> Cs, 12:07):
>
> Hi Julian,
> I think it's currently not possible to do that in a fault-tolerant way.
> (The problem is that the state that results from the broadcast input also
> needs to be checkpointed, which is not possible right now.) A while back, I
> created an issue for that:
> https://issues.apache.org/jira/browse/FLINK-3659. I'm hoping we can still
> get this in in some form for Flink 1.2.
>
> Cheers,
> Aljoscha
>
> On Thu, 27 Oct 2016 at 10:57 Julian Bauß  wrote:
>
> Hi Ufuk,
>
> Thanks for your response. Unfortunately that does not work.
> Having ValueStateDescriptors in the CoFlatMap on the connected Stream
> requires a keyBy on the connected Stream.
>
> Another solution I can think of would be this:
>
> stream1.connect(stream2)
> .map(new MergeStreamsMapFunction()) // Holds transient state
> of the last ConfigMessage and maps Stream1's data to a Tuple2 ConfigMessage>
> .keyBy(new SomeIdKeySelector()) // KeyBy Id to allow
> for ValueStateDescriptors and semantically correct partitioning according
> to business logic
> .flatMap(new StatefulFlatMapFunction()) // Save latest
> received ConfigMessage-Value in ValueStateDescriptor here
> .addSink(...);
>
> I have yet to test this.
> This seems a little complicated but it might work?
>
> Best Regards,
>
> Julian
>
> 2016-10-26 16:09 GMT+02:00 Ufuk Celebi :
>
> Does the following work?
>
> stream1.keyBy().connect(stream2.broadcast())
>
> On Wed, Oct 26, 2016 at 2:01 PM, Julian Bauß 
> wrote:
> > Hello Everybody,
> >
> > I'm currently trying to change the state of a CoFlatMapFunction with the
> > help of a connected configuration-stream. The code looks something like
> > this.
> >
> > streamToBeConfigured.connect(configMessageStream)
> > .keyBy(new SomeIdKeySelecor(), new ConfigMessageKeySelector)
> > .flatMap(new FunctionWithConfigurableState())
> > .addSink(...);
> >
> > The Stream with the actual functionality is keyedBy an Id but the
> > ConfigMessages don't contain any Id to key them by. They are just
> > "key=value"-Strings that should be broadcasted to all instances of the
> > CoFlatMapFunction() regardless of what Id they are keyed by.
> >
> > Is there any way to do that?
> >
> > Best Regards,
> >
> > Julian
>
>
>


Re: Broadcast Config-Values through connected Configuration Stream

2016-10-27 Thread Gyula Fóra
Hi,

I agree with Aljoscha that needs to be solved properly, but it is
technically possible to do it now as well (he actually had a PR a while
back doing this.)

You need to manually change how the transform method works on the connected
stream to allow setting the key only one input. You need to use some
reflection magic though to create the output operator if you dont want to
recompile your own Flink version but it's definitely doable. (I use this
technique in several of my production jobs)

https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java#L234

As for fault-tolerance you need to make sure to checkpoint the broadcasted
state using the Checkpointed interface.

Cheers,
Gyula


Aljoscha Krettek  ezt írta (időpont: 2016. okt. 27.,
Cs, 12:07):

> Hi Julian,
> I think it's currently not possible to do that in a fault-tolerant way.
> (The problem is that the state that results from the broadcast input also
> needs to be checkpointed, which is not possible right now.) A while back, I
> created an issue for that:
> https://issues.apache.org/jira/browse/FLINK-3659. I'm hoping we can still
> get this in in some form for Flink 1.2.
>
> Cheers,
> Aljoscha
>
> On Thu, 27 Oct 2016 at 10:57 Julian Bauß  wrote:
>
> Hi Ufuk,
>
> Thanks for your response. Unfortunately that does not work.
> Having ValueStateDescriptors in the CoFlatMap on the connected Stream
> requires a keyBy on the connected Stream.
>
> Another solution I can think of would be this:
>
> stream1.connect(stream2)
> .map(new MergeStreamsMapFunction()) // Holds transient state
> of the last ConfigMessage and maps Stream1's data to a Tuple2 ConfigMessage>
> .keyBy(new SomeIdKeySelector()) // KeyBy Id to allow
> for ValueStateDescriptors and semantically correct partitioning according
> to business logic
> .flatMap(new StatefulFlatMapFunction()) // Save latest
> received ConfigMessage-Value in ValueStateDescriptor here
> .addSink(...);
>
> I have yet to test this.
> This seems a little complicated but it might work?
>
> Best Regards,
>
> Julian
>
> 2016-10-26 16:09 GMT+02:00 Ufuk Celebi :
>
> Does the following work?
>
> stream1.keyBy().connect(stream2.broadcast())
>
> On Wed, Oct 26, 2016 at 2:01 PM, Julian Bauß 
> wrote:
> > Hello Everybody,
> >
> > I'm currently trying to change the state of a CoFlatMapFunction with the
> > help of a connected configuration-stream. The code looks something like
> > this.
> >
> > streamToBeConfigured.connect(configMessageStream)
> > .keyBy(new SomeIdKeySelecor(), new ConfigMessageKeySelector)
> > .flatMap(new FunctionWithConfigurableState())
> > .addSink(...);
> >
> > The Stream with the actual functionality is keyedBy an Id but the
> > ConfigMessages don't contain any Id to key them by. They are just
> > "key=value"-Strings that should be broadcasted to all instances of the
> > CoFlatMapFunction() regardless of what Id they are keyed by.
> >
> > Is there any way to do that?
> >
> > Best Regards,
> >
> > Julian
>
>
>


Re: Broadcast Config-Values through connected Configuration Stream

2016-10-27 Thread Aljoscha Krettek
Hi Julian,
I think it's currently not possible to do that in a fault-tolerant way.
(The problem is that the state that results from the broadcast input also
needs to be checkpointed, which is not possible right now.) A while back, I
created an issue for that: https://issues.apache.org/jira/browse/FLINK-3659.
I'm hoping we can still get this in in some form for Flink 1.2.

Cheers,
Aljoscha

On Thu, 27 Oct 2016 at 10:57 Julian Bauß  wrote:

> Hi Ufuk,
>
> Thanks for your response. Unfortunately that does not work.
> Having ValueStateDescriptors in the CoFlatMap on the connected Stream
> requires a keyBy on the connected Stream.
>
> Another solution I can think of would be this:
>
> stream1.connect(stream2)
> .map(new MergeStreamsMapFunction()) // Holds transient state
> of the last ConfigMessage and maps Stream1's data to a Tuple2 ConfigMessage>
> .keyBy(new SomeIdKeySelector()) // KeyBy Id to allow
> for ValueStateDescriptors and semantically correct partitioning according
> to business logic
> .flatMap(new StatefulFlatMapFunction()) // Save latest
> received ConfigMessage-Value in ValueStateDescriptor here
> .addSink(...);
>
> I have yet to test this.
> This seems a little complicated but it might work?
>
> Best Regards,
>
> Julian
>
> 2016-10-26 16:09 GMT+02:00 Ufuk Celebi :
>
> Does the following work?
>
> stream1.keyBy().connect(stream2.broadcast())
>
> On Wed, Oct 26, 2016 at 2:01 PM, Julian Bauß 
> wrote:
> > Hello Everybody,
> >
> > I'm currently trying to change the state of a CoFlatMapFunction with the
> > help of a connected configuration-stream. The code looks something like
> > this.
> >
> > streamToBeConfigured.connect(configMessageStream)
> > .keyBy(new SomeIdKeySelecor(), new ConfigMessageKeySelector)
> > .flatMap(new FunctionWithConfigurableState())
> > .addSink(...);
> >
> > The Stream with the actual functionality is keyedBy an Id but the
> > ConfigMessages don't contain any Id to key them by. They are just
> > "key=value"-Strings that should be broadcasted to all instances of the
> > CoFlatMapFunction() regardless of what Id they are keyed by.
> >
> > Is there any way to do that?
> >
> > Best Regards,
> >
> > Julian
>
>
>


Re: Broadcast Config-Values through connected Configuration Stream

2016-10-27 Thread Julian Bauß
Hi Ufuk,

Thanks for your response. Unfortunately that does not work.
Having ValueStateDescriptors in the CoFlatMap on the connected Stream
requires a keyBy on the connected Stream.

Another solution I can think of would be this:

stream1.connect(stream2)
.map(new MergeStreamsMapFunction()) // Holds transient state of
the last ConfigMessage and maps Stream1's data to a Tuple2
.keyBy(new SomeIdKeySelector()) // KeyBy Id to allow
for ValueStateDescriptors and semantically correct partitioning according
to business logic
.flatMap(new StatefulFlatMapFunction()) // Save latest received
ConfigMessage-Value in ValueStateDescriptor here
.addSink(...);

I have yet to test this.
This seems a little complicated but it might work?

Best Regards,

Julian

2016-10-26 16:09 GMT+02:00 Ufuk Celebi :

> Does the following work?
>
> stream1.keyBy().connect(stream2.broadcast())
>
> On Wed, Oct 26, 2016 at 2:01 PM, Julian Bauß 
> wrote:
> > Hello Everybody,
> >
> > I'm currently trying to change the state of a CoFlatMapFunction with the
> > help of a connected configuration-stream. The code looks something like
> > this.
> >
> > streamToBeConfigured.connect(configMessageStream)
> > .keyBy(new SomeIdKeySelecor(), new ConfigMessageKeySelector)
> > .flatMap(new FunctionWithConfigurableState())
> > .addSink(...);
> >
> > The Stream with the actual functionality is keyedBy an Id but the
> > ConfigMessages don't contain any Id to key them by. They are just
> > "key=value"-Strings that should be broadcasted to all instances of the
> > CoFlatMapFunction() regardless of what Id they are keyed by.
> >
> > Is there any way to do that?
> >
> > Best Regards,
> >
> > Julian
>


Can we do batch writes on cassandra using flink while leveraging the locality?

2016-10-27 Thread kant kodali
Can we do batch writes on Cassandra using Flink while leveraging the
locality? For example the batch writes in Cassandra will put pressure on
the coordinator but since the connectors are built by leveraging the
locality I was wondering if we could do batch of writes on a node where the
batch belongs?


Re: Unit testing a Kafka stream based application?

2016-10-27 Thread Niels Basjes
Thanks.

This is exactly what I needed.

Niels

On Wed, Oct 26, 2016 at 11:03 AM, Robert Metzger 
wrote:

> Hi Niels,
>
> Sorry for the late response.
>
> you can launch a Kafka Broker within a JVM and use it for testing purposes.
> Flink's Kafka connector is using that a lot for integration tests. Here is
> the code starting the Kafka server: https://github.com/apache/flink/blob/
> 770f2f83a81b2810aff171b2f56390ef686f725a/flink-streaming-
> connectors/flink-connector-kafka-0.8/src/test/java/org/
> apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java#L358
>
>
> In our tests we also start a Flink mini cluster, to submit jobs to.
> Here is an example: https://github.com/apache/flink/blob/
> f46ca39188dce1764ee6615eb6697588fdc04a2a/flink-streaming-
> connectors/flink-connector-kafka-base/src/test/java/org/
> apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java#L1408
>
> I hope you can find the right code lines to copy for your purposes.
>
> Regards,
> Robert
>
>
> On Fri, Oct 21, 2016 at 4:00 PM, Niels Basjes  wrote:
>
>> Hi,
>>
>> In addition to having unit tests for the individual components (map,
>> flatmap, reduce, etc) of my application I would like to write unit tests
>> for the entire flow of my Flink application.
>>
>> My application reads from Kafka, does various processing and writes out
>> put to both kafka and files.
>>
>> This means I need a controlled (mock) Kafka broker where I can insert a
>> specific sequence of messages, my application that reads from that and then
>> writes the output somewhere else.
>>
>> What is the recommended way of doing this?
>> What tools/example projects are available for building such tests?
>>
>> Thanks.
>>
>> --
>> Best regards / Met vriendelijke groeten,
>>
>> Niels Basjes
>>
>
>


-- 
Best regards / Met vriendelijke groeten,

Niels Basjes


Re: FlinkKafkaConsumerBase - Received confirmation for unknown checkpoint

2016-10-27 Thread PedroMrChaves
Hello,

I Am using the version 1.2-SNAPSHOT. 
I will try with a stable version to see if the problem persists.

Regards,
Pedro Chaves. 



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/FlinkKafkaConsumerBase-Received-confirmation-for-unknown-checkpoint-tp9674p9749.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


RE: TIMESTAMP TypeInformation

2016-10-27 Thread Radu Tudoran
OK. I will open the JIRA

From: Fabian Hueske [mailto:fhue...@gmail.com]
Sent: Thursday, October 27, 2016 9:56 AM
To: user@flink.apache.org
Subject: Re: TIMESTAMP TypeInformation

Yes, I think you are right.
TypeInfoParser needs to be extended to parse the java.sql.* types into the 
corresponding TypeInfos.

Can you open a JIRA for that?
Thanks, Fabian

2016-10-27 9:31 GMT+02:00 Radu Tudoran 
>:
Hi,

I dig meanwhile more through this and I think I found a bug actually.

The scenario that I was trying to describe was something like

1.   You have a generic datastream with Tuple (alternatively I could move 
to row I guess) and you get the data from whatever stream (e.g., kafka, network 
socket…)

2.   In the map/flat map function you parse and instantiate the tuple 
generically

3.   In the “returns()” function of the map you enforce the types



DataStream = env.socketTextStream(…)

.map(new mapFunction(){

Public Tuple map(String value){

Tuple out  = 
Tuple.getTupleClass(#)

…

out.setField(SqlTimeTypeInfo.TIMESTAMP,0)

…



}}) .returns(“Tuple#”);



The problem is that if you rely on the type extraction mechanism called after 
the returns to recognize TIMESTAMP of type SqlTimeTypeInfo it will not happen 
but instead a GenericType will be created.
It looks like the type parsers were not extended to consider this types

Dr. Radu Tudoran
Senior Research Engineer - Big Data Expert
IT R Division

[cid:image007.jpg@01CD52EB.AD060EE0]
HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, 
www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, 
which is intended only for the person or entity whose address is listed above. 
Any use of the information contained herein in any way (including, but not 
limited to, total or partial disclosure, reproduction, or dissemination) by 
persons other than the intended recipient(s) is prohibited. If you receive this 
e-mail in error, please notify the sender by phone or email immediately and 
delete it!

From: Fabian Hueske [mailto:fhue...@gmail.com]
Sent: Wednesday, October 26, 2016 10:11 PM
To: user@flink.apache.org
Subject: Re: TIMESTAMP TypeInformation

Hi Radu,

I might not have complete understood your problem, but if you do

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)

val ds = env.fromElements( (1, 1L, new Time(1,2,3)) )
val t = ds.toTable(tEnv, 'a, 'b, 'c)

val results = t
.select('c + 10.seconds)
then field 'c will be of type SqlTimeTypeInfo and handled as such.
Hope this helps,
Fabian

2016-10-25 17:32 GMT+02:00 Radu Tudoran 
>:
Re-hi,

I actually realized that the problem comes from the fact that the datastream 
that I am registering does not create properly the types.

I am using something like

DataStream … .returns(“TupleX<,….,java.sql.Timestamp, 
java.sql.Time>”)…and I was expecting that these will be converted to 
SqlTimeTypeInfo…but it is converted to GenericType. Anythoughts how I could 
force the type to be recognize as a SqlTimeType?


From: Radu Tudoran
Sent: Tuesday, October 25, 2016 4:46 PM
To: 'user@flink.apache.org'
Subject: TIMESTAMP TypeInformation

Hi,

I would like to create a TIMESTAMP type from the data schema. I would need this 
to match against the FlinkTypeFactory (toTypeInfo())

def toTypeInfo(relDataType: RelDataType): TypeInformation[_] = 
relDataType.getSqlTypeName match {
case BOOLEAN => BOOLEAN_TYPE_INFO
case TINYINT => BYTE_TYPE_INFO
case SMALLINT => SHORT_TYPE_INFO
case INTEGER => INT_TYPE_INFO
case BIGINT => LONG_TYPE_INFO
case FLOAT => FLOAT_TYPE_INFO
case DOUBLE => DOUBLE_TYPE_INFO
case VARCHAR | CHAR => STRING_TYPE_INFO
case DECIMAL => BIG_DEC_TYPE_INFO

// date/time types
case DATE => SqlTimeTypeInfo.DATE
case TIME => SqlTimeTypeInfo.TIME
case TIMESTAMP => SqlTimeTypeInfo.TIMESTAMP

I tried to use create the TypeInformation by calling directly 
SqlTimeTypeInfo.TIMESTAMP . However, it seems that relDataType.getSqlTypeName 
match is of 

Re: TIMESTAMP TypeInformation

2016-10-27 Thread Fabian Hueske
Yes, I think you are right.
TypeInfoParser needs to be extended to parse the java.sql.* types into the
corresponding TypeInfos.

Can you open a JIRA for that?

Thanks, Fabian

2016-10-27 9:31 GMT+02:00 Radu Tudoran :

> Hi,
>
>
>
> I dig meanwhile more through this and I think I found a bug actually.
>
>
>
> The scenario that I was trying to describe was something like
>
> 1.   You have a generic datastream with Tuple (alternatively I could
> move to row I guess) and you get the data from whatever stream (e.g.,
> kafka, network socket…)
>
> 2.   In the map/flat map function you parse and instantiate the tuple
> generically
>
> 3.   In the “returns()” function of the map you enforce the types
>
>
>
> DataStream = env.socketTextStream(…)
>
> .map(new mapFunction(){
>
> Public Tuple map(String value){
>
> Tuple out  =
> Tuple.getTupleClass(#)
>
> …
>
> out.setField(SqlTimeTypeInfo.TIMESTAMP,0)
>
> …
>
>
>
> }}) .returns(“Tuple#”);
>
>
>
>
>
> The problem is that if you rely on the type extraction mechanism called
> after the returns to recognize TIMESTAMP of type SqlTimeTypeInfo it will
> not happen but instead a GenericType will be created.
>
> It looks like the type parsers were not extended to consider this types
>
>
>
> Dr. Radu Tudoran
>
> Senior Research Engineer - Big Data Expert
>
> IT R Division
>
>
>
> [image: cid:image007.jpg@01CD52EB.AD060EE0]
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>
> European Research Center
>
> Riesstrasse 25, 80992 München
>
>
>
> E-mail: *radu.tudo...@huawei.com *
>
> Mobile: +49 15209084330
>
> Telephone: +49 891588344173
>
>
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
>
> This e-mail and its attachments contain confidential information from
> HUAWEI, which is intended only for the person or entity whose address is
> listed above. Any use of the information contained herein in any way
> (including, but not limited to, total or partial disclosure, reproduction,
> or dissemination) by persons other than the intended recipient(s) is
> prohibited. If you receive this e-mail in error, please notify the sender
> by phone or email immediately and delete it!
>
>
>
> *From:* Fabian Hueske [mailto:fhue...@gmail.com]
> *Sent:* Wednesday, October 26, 2016 10:11 PM
> *To:* user@flink.apache.org
> *Subject:* Re: TIMESTAMP TypeInformation
>
>
>
> Hi Radu,
>
> I might not have complete understood your problem, but if you do
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env)
>
> val ds = env.fromElements( (1, 1L, new Time(1,2,3)) )
> val t = ds.toTable(tEnv, 'a, 'b, 'c)
>
> val results = t
> .select('c + 10.seconds)
>
> then field 'c will be of type SqlTimeTypeInfo and handled as such.
>
> Hope this helps,
>
> Fabian
>
>
>
> 2016-10-25 17:32 GMT+02:00 Radu Tudoran :
>
> Re-hi,
>
>
>
> I actually realized that the problem comes from the fact that the
> datastream that I am registering does not create properly the types.
>
>
>
> I am using something like
>
>
>
> DataStream … .returns(“TupleX<,….,java.sql.Timestamp,
> java.sql.Time>”)…and I was expecting that these will be converted to
> SqlTimeTypeInfo…but it is converted to GenericType. Anythoughts how I could
> force the type to be recognize as a SqlTimeType?
>
>
>
>
>
> *From:* Radu Tudoran
> *Sent:* Tuesday, October 25, 2016 4:46 PM
> *To:* 'user@flink.apache.org'
> *Subject:* TIMESTAMP TypeInformation
>
>
>
> Hi,
>
>
>
> I would like to create a TIMESTAMP type from the data schema. I would need
> this to match against the FlinkTypeFactory (toTypeInfo())
>
>
>
> *def* toTypeInfo(relDataType: RelDataType): TypeInformation[_] =
> relDataType.getSqlTypeName *match* {
>
> *case* BOOLEAN => BOOLEAN_TYPE_INFO
>
> *case* TINYINT => BYTE_TYPE_INFO
>
> *case* SMALLINT => SHORT_TYPE_INFO
>
> *case* INTEGER => INT_TYPE_INFO
>
> *case* BIGINT => LONG_TYPE_INFO
>
> *case* FLOAT => FLOAT_TYPE_INFO
>
> *case* DOUBLE => DOUBLE_TYPE_INFO
>
> *case* VARCHAR | CHAR => STRING_TYPE_INFO
>
> *case* DECIMAL => BIG_DEC_TYPE_INFO
>
>
>
> // date/time types
>
> *case* DATE => SqlTimeTypeInfo.DATE
>
> *case* TIME => SqlTimeTypeInfo.TIME
>
> *case* *TIMESTAMP** => SqlTimeTypeInfo.**TIMESTAMP*
>
>
>
> I tried to use create the TypeInformation by calling directly
> SqlTimeTypeInfo.TIMESTAMP . However, it seems that
> relDataType.getSqlTypeName match is of type ANY instead of 

Re: Flink Cassandra Connector is not working

2016-10-27 Thread Fabian Hueske
Hi,

a NoSuchMethod indicates that you are using incompatible versions.
You should check that the versions of your job dependencies and the version
cluster you want to run the job on are the same.

Best, Fabian

2016-10-27 7:13 GMT+02:00 NagaSaiPradeep :

> Hi,
>   I am working on connecting Flink with Cassandra. When I ran the sample
> program (which I got from  Github
>  streaming-connectors/flink-connector-cassandra/src/test/
> java/org/apache/flink/streaming/connectors/cassandra/example>
> ), I am getting the following error:
>
> Exception in thread "main" java.lang.NoSuchMethodError:
> org.apache.flink.streaming.api.datastream.DataStreamSink.name
> (Ljava/lang/String;)Lorg/apache/flink/streaming/api/
> datastream/DataStreamSink;
> at
> org.apache.flink.streaming.connectors.cassandra.CassandraSink$
> CassandraPojoSinkBuilder.build(CassandraSink.java:325)
>
>
> Regards
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Flink-Cassandra-
> Connector-is-not-working-tp9744.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


RE: TIMESTAMP TypeInformation

2016-10-27 Thread Radu Tudoran
Hi,

I dig meanwhile more through this and I think I found a bug actually.

The scenario that I was trying to describe was something like

1.   You have a generic datastream with Tuple (alternatively I could move 
to row I guess) and you get the data from whatever stream (e.g., kafka, network 
socket…)

2.   In the map/flat map function you parse and instantiate the tuple 
generically

3.   In the “returns()” function of the map you enforce the types



DataStream = env.socketTextStream(…)

.map(new mapFunction(){

Public Tuple map(String value){

Tuple out  = 
Tuple.getTupleClass(#)

…

out.setField(SqlTimeTypeInfo.TIMESTAMP,0)

…



}}) .returns(“Tuple#”);



The problem is that if you rely on the type extraction mechanism called after 
the returns to recognize TIMESTAMP of type SqlTimeTypeInfo it will not happen 
but instead a GenericType will be created.
It looks like the type parsers were not extended to consider this types

Dr. Radu Tudoran
Senior Research Engineer - Big Data Expert
IT R Division

[cid:image007.jpg@01CD52EB.AD060EE0]
HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, 
www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, 
which is intended only for the person or entity whose address is listed above. 
Any use of the information contained herein in any way (including, but not 
limited to, total or partial disclosure, reproduction, or dissemination) by 
persons other than the intended recipient(s) is prohibited. If you receive this 
e-mail in error, please notify the sender by phone or email immediately and 
delete it!

From: Fabian Hueske [mailto:fhue...@gmail.com]
Sent: Wednesday, October 26, 2016 10:11 PM
To: user@flink.apache.org
Subject: Re: TIMESTAMP TypeInformation

Hi Radu,

I might not have complete understood your problem, but if you do

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)

val ds = env.fromElements( (1, 1L, new Time(1,2,3)) )
val t = ds.toTable(tEnv, 'a, 'b, 'c)

val results = t
.select('c + 10.seconds)
then field 'c will be of type SqlTimeTypeInfo and handled as such.
Hope this helps,
Fabian

2016-10-25 17:32 GMT+02:00 Radu Tudoran 
>:
Re-hi,

I actually realized that the problem comes from the fact that the datastream 
that I am registering does not create properly the types.

I am using something like

DataStream … .returns(“TupleX<,….,java.sql.Timestamp, 
java.sql.Time>”)…and I was expecting that these will be converted to 
SqlTimeTypeInfo…but it is converted to GenericType. Anythoughts how I could 
force the type to be recognize as a SqlTimeType?


From: Radu Tudoran
Sent: Tuesday, October 25, 2016 4:46 PM
To: 'user@flink.apache.org'
Subject: TIMESTAMP TypeInformation

Hi,

I would like to create a TIMESTAMP type from the data schema. I would need this 
to match against the FlinkTypeFactory (toTypeInfo())

def toTypeInfo(relDataType: RelDataType): TypeInformation[_] = 
relDataType.getSqlTypeName match {
case BOOLEAN => BOOLEAN_TYPE_INFO
case TINYINT => BYTE_TYPE_INFO
case SMALLINT => SHORT_TYPE_INFO
case INTEGER => INT_TYPE_INFO
case BIGINT => LONG_TYPE_INFO
case FLOAT => FLOAT_TYPE_INFO
case DOUBLE => DOUBLE_TYPE_INFO
case VARCHAR | CHAR => STRING_TYPE_INFO
case DECIMAL => BIG_DEC_TYPE_INFO

// date/time types
case DATE => SqlTimeTypeInfo.DATE
case TIME => SqlTimeTypeInfo.TIME
case TIMESTAMP => SqlTimeTypeInfo.TIMESTAMP

I tried to use create the TypeInformation by calling directly 
SqlTimeTypeInfo.TIMESTAMP . However, it seems that relDataType.getSqlTypeName 
match is of type ANY instead of being of type TIMESTAMP.

Any thoughts of how to create the proper TIMESTAMP typeinformation?