Re: Flink v0.10.2

2016-01-13 Thread Robert Metzger
Hi,

there are currently no planned releases. I would actually like to start
preparing for the 1.0 release soon, but the community needs to discuss that
first.

How urgently do you need a 0.10.2 release? If this is the last blocker for
using Flink in production at your company, I can push for the bugfix
release.


On Wed, Jan 13, 2016 at 8:39 AM, Welly Tambunan  wrote:

> Hi All,
>
> We currently using snapshot version for development as we face Data
> Stream union error. For deployment we may need to built the flink from
> the master.
>
>
> I want to ask when this version will be released ? Any roadmap and plan i
> can look for this release ?
>
>
> Thanks a lot
>
> Cheers
> --
> Welly Tambunan
> Triplelands
>
> http://weltam.wordpress.com
> http://www.triplelands.com 
>


Re: Checkpoint for exact-once proccessing

2016-01-13 Thread Don Frascuchon
Hi Stephan,

Thanks for your quickly response.

So, consider an operator task with two processed records and no barrier
incoming. If the task fail and must be records, the last consistent
snapshot will be used, which no includes information about the processed
but no checkpointed  records. What about this situation? The registers will
be resent to failed task after, or will be discarded? How flink manage
information about this records for exact-once guarantees? The user function
inside operator must be idempotent (i think about some kind of persistence
in  a sink task)

Thanks in advance !


El mié., 13 ene. 2016 a las 11:17, Stephan Ewen ()
escribió:

> Hi!
>
> I think there is a misunderstanding. There are no identifiers maintained
> and no individual records deleted.
>
> On recovery, all operators reset their state to a consistent snapshot:
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/stream_checkpointing.html
>
>
> Greetings,
> Stephan
>
>
> On Wed, Jan 13, 2016 at 11:08 AM, Don Frascuchon 
> wrote:
>
>> Hello,
>>
>> I'm trying to understand the process of checkpoint processing for
>> exact-once in Flink, and I have some doubts.
>>
>> The documentation says that when there is a failure and the state of an
>> operator is restored, the already processed records are deleted based on
>> their identifiers.
>>
>> My doubts is, how these identifiers between two checkpoints are
>> maintained? Every time a new input record comes to the stateful operator,
>> Flink persists it before making the checkpoint? Otherwise, there may be
>> messages to reprocess after a failure.
>>
>> Thanks !!!
>>
>
>


Accessing configuration in RichFunction

2016-01-13 Thread Christian Kreutzfeldt
Hi

While working on a RichFilterFunction implementation I was wondering, if
there is a much better way to access configuration
options read from file during startup. Actually, I am
using getRuntimeContext().getExecutionConfig().getGlobalJobParameters()
to get access to my settings.

Reason for that is, that the Configuration parameter provided to the open
function does not carry my settings. That is probably
the case as I
use this.executionEnvironment.getConfig().setGlobalJobParameters(cfg) to
pass my configuration into the environment
which in turn is not passed on as part of the open call - I found no other
way to handle configuration ;-)

My question is: who is responsible for calling the open function, where
does the configuration parameter has its origins aka where
is its content taken from and is it possible to define somewhere in the
main program which configuration to pass into a specific operator?

Best
  Christian


Re: Accessing configuration in RichFunction

2016-01-13 Thread Fabian Hueske
Hi Christian,

the open method is called by the Flink workers when the parallel tasks are
initialized.
The configuration parameter is the configuration object of the operator.
You can set parameters in the operator config as follows:

DataSet text = ...
DataSet wc = text.flatMap(new
Tokenizer()).getParameters().setString("myKey", "myVal");

Best, Fabian


2016-01-13 10:29 GMT+01:00 Christian Kreutzfeldt :

> Hi
>
> While working on a RichFilterFunction implementation I was wondering, if
> there is a much better way to access configuration
> options read from file during startup. Actually, I am
> using getRuntimeContext().getExecutionConfig().getGlobalJobParameters()
> to get access to my settings.
>
> Reason for that is, that the Configuration parameter provided to the open
> function does not carry my settings. That is probably
> the case as I
> use this.executionEnvironment.getConfig().setGlobalJobParameters(cfg) to
> pass my configuration into the environment
> which in turn is not passed on as part of the open call - I found no other
> way to handle configuration ;-)
>
> My question is: who is responsible for calling the open function, where
> does the configuration parameter has its origins aka where
> is its content taken from and is it possible to define somewhere in the
> main program which configuration to pass into a specific operator?
>
> Best
>   Christian
>


Checkpoint for exact-once proccessing

2016-01-13 Thread Don Frascuchon
Hello,

I'm trying to understand the process of checkpoint processing for
exact-once in Flink, and I have some doubts.

The documentation says that when there is a failure and the state of an
operator is restored, the already processed records are deleted based on
their identifiers.

My doubts is, how these identifiers between two checkpoints are maintained?
Every time a new input record comes to the stateful operator, Flink
persists it before making the checkpoint? Otherwise, there may be messages
to reprocess after a failure.

Thanks !!!


Re: Checkpoint for exact-once proccessing

2016-01-13 Thread Stephan Ewen
Hi!

I think there is a misunderstanding. There are no identifiers maintained
and no individual records deleted.

On recovery, all operators reset their state to a consistent snapshot:
https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/stream_checkpointing.html


Greetings,
Stephan


On Wed, Jan 13, 2016 at 11:08 AM, Don Frascuchon 
wrote:

> Hello,
>
> I'm trying to understand the process of checkpoint processing for
> exact-once in Flink, and I have some doubts.
>
> The documentation says that when there is a failure and the state of an
> operator is restored, the already processed records are deleted based on
> their identifiers.
>
> My doubts is, how these identifiers between two checkpoints are
> maintained? Every time a new input record comes to the stateful operator,
> Flink persists it before making the checkpoint? Otherwise, there may be
> messages to reprocess after a failure.
>
> Thanks !!!
>


Re: Flink v0.10.2

2016-01-13 Thread Welly Tambunan
Hi Robert,

We are on deadline for demo stage right now before production for
management so it would be great to have 0.10.2 for stable version within
this week if possible ?

Cheers

On Wed, Jan 13, 2016 at 4:13 PM, Robert Metzger  wrote:

> Hi,
>
> there are currently no planned releases. I would actually like to start
> preparing for the 1.0 release soon, but the community needs to discuss that
> first.
>
> How urgently do you need a 0.10.2 release? If this is the last blocker for
> using Flink in production at your company, I can push for the bugfix
> release.
>
>
> On Wed, Jan 13, 2016 at 8:39 AM, Welly Tambunan  wrote:
>
>> Hi All,
>>
>> We currently using snapshot version for development as we face Data
>> Stream union error. For deployment we may need to built the flink from
>> the master.
>>
>>
>> I want to ask when this version will be released ? Any roadmap and plan i
>> can look for this release ?
>>
>>
>> Thanks a lot
>>
>> Cheers
>> --
>> Welly Tambunan
>> Triplelands
>>
>> http://weltam.wordpress.com
>> http://www.triplelands.com 
>>
>
>


-- 
Welly Tambunan
Triplelands

http://weltam.wordpress.com
http://www.triplelands.com 


Re: Checkpoint for exact-once proccessing

2016-01-13 Thread Stephan Ewen
Thanks, Gordon, for the nice answer!

One thing is important to add: Exactly-once refers to state maintained by
Flink. All side effects (changes made to the "outside" world), which
includes sinks, need in fact to be idempotent, or will only have "at-least
once" semantics.

In practice, this works often very well, because results can be computes
with "exactly once" semantics in Flink and are then sent "one or more
times" to the outside world (for example a database). If that database
simply overwrites/replaces values for keys (think upsert operation), this
gives end-to-end exactly-once semantics.

Greetings,
Stephan


On Wed, Jan 13, 2016 at 1:31 PM, Tzu-Li (Gordon) Tai 
wrote:

> Hi Francis,
>
> A part of every complete snapshot is the record positions associated with
> the barrier that triggered the checkpointing of this snapshot. The snapshot
> is completed only when all the records within the checkpoint reaches the
> sink. When a topology fails, all the operators' state will fall back to the
> latest complete snapshot (incomplete snapshots will be ignored). The data
> source will also fall back to the position recorded with this snapshot, so
> even if there are repeatedly read data records after the restore, the
> restored operator's state are also clean of the records effect. This way,
> Flink guarantees exactly-once effects of each record on every operator's
> state. The user functions in operators need not to be implemented
> idempotent.
>
> Hope this helps answer your question!
>
> Cheers,
> Gordon
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoint-for-exact-once-proccessing-tp4261p4264.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: Checkpoint for exact-once proccessing

2016-01-13 Thread Tzu-Li (Gordon) Tai
Hi Francis,

A part of every complete snapshot is the record positions associated with
the barrier that triggered the checkpointing of this snapshot. The snapshot
is completed only when all the records within the checkpoint reaches the
sink. When a topology fails, all the operators' state will fall back to the
latest complete snapshot (incomplete snapshots will be ignored). The data
source will also fall back to the position recorded with this snapshot, so
even if there are repeatedly read data records after the restore, the
restored operator's state are also clean of the records effect. This way,
Flink guarantees exactly-once effects of each record on every operator's
state. The user functions in operators need not to be implemented
idempotent.

Hope this helps answer your question!

Cheers,
Gordon



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoint-for-exact-once-proccessing-tp4261p4264.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Flink QuickStart: On start up, running into ClassNotFoundException: org.apache.flink.streaming.runtime.tasks.OneInputStreamTask

2016-01-13 Thread Prez Cannady
I’m experimenting combining Spring with Flink.  I’ve successfully instrumented 
for Gradle, but Maven is emitting ClassNotFoundExceptions for items ostensibly 
on the class path.

Project is currently configured for:

1. Scala 2.10.4
2. Flink 0.9.1

I execute the following

```
# In one terminal
$ nc -lk -p  --sh-exec "cat /usr/share/dict/words | head -n 10”


# In another terminal
$ mvn clean install spring-boot:run -Drun.arguments=“localhost,”

# observe output
```

The specific class not found is 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.  However, Spring 
Boot Plugin is configured to repackage a fat jar, and I can see that the class 
is present in the included flink-streaming-core jar.  Additionally, LogBack 
shows that the flink-streaming-core jar is in my classpath.


I’m hoping I’m just missing something that should be obvious.  While I wish 
could move forward with just Gradle, unfortunately I have to support Maven 
builds.

For reference, the complete project is available here:

https://github.com/OCExercise/wordcount-processing 


Additionally

1. pom.xml 
(https://github.com/OCExercise/wordcount-processing/blob/master/pom.xml 
)
2. build.grade 
(https://github.com/OCExercise/wordcount-processing/blob/master/build.gradle 
)
3. Gist containing the full exception 
(https://gist.github.com/revprez/2c1fb01c40e5d6790247 
)

Prez Cannady  
p: 617 500 3378  
e: revp...@opencorrelate.org   
GH: https://github.com/opencorrelate   
LI: https://www.linkedin.com/in/revprez   



Re: DataStream jdbc sink

2016-01-13 Thread Matthias J. Sax
Hi,

use JDBCOutputFormatBuilder to set all required parameters:

> JDBCOutputFormatBuilder builder = JDBCOutputFormat.buildJDBCOutputFormat();
> builder.setDBUrl(...)
> // and more
> 
> var.write(builder.finish, OL);

-Matthias


On 01/13/2016 06:21 PM, Traku traku wrote:
> Hi everyone.
> 
> I'm trying to migrate some code to flink 0.10 and I'm having a problem.
> 
> I try to create a custom sink to insert the data to a postgresql
> database. My code was this.
> 
> var.output(
> // build and configure OutputFormat
> JDBCOutputFormat
> .buildJDBCOutputFormat()
> .setDrivername("org.postgresql.Driver")
> .setDBUrl("jdbc:postgresql://127.0.0.1:5432/test
> ")
> .setUsername("postgres")
> .setPassword("")
> .setQuery("insert into XXX  values  (?,?,?);") 
> .finish()
> );
> 
> Could you help me?
> 
> Best regards.
> 
> 



signature.asc
Description: OpenPGP digital signature


Re: DataStream jdbc sink

2016-01-13 Thread Traku traku
thank you!!

2016-01-13 20:51 GMT+01:00 Matthias J. Sax :

> Hi,
>
> use JDBCOutputFormatBuilder to set all required parameters:
>
> > JDBCOutputFormatBuilder builder =
> JDBCOutputFormat.buildJDBCOutputFormat();
> > builder.setDBUrl(...)
> > // and more
> >
> > var.write(builder.finish, OL);
>
> -Matthias
>
>
> On 01/13/2016 06:21 PM, Traku traku wrote:
> > Hi everyone.
> >
> > I'm trying to migrate some code to flink 0.10 and I'm having a problem.
> >
> > I try to create a custom sink to insert the data to a postgresql
> > database. My code was this.
> >
> > var.output(
> > // build and configure OutputFormat
> > JDBCOutputFormat
> > .buildJDBCOutputFormat()
> > .setDrivername("org.postgresql.Driver")
> > .setDBUrl("jdbc:postgresql://127.0.0.1:5432/test
> > ")
> > .setUsername("postgres")
> > .setPassword("")
> > .setQuery("insert into XXX  values  (?,?,?);")
> > .finish()
> > );
> >
> > Could you help me?
> >
> > Best regards.
> >
> >
>
>


Re: Flink QuickStart: On start up, running into ClassNotFoundException: org.apache.flink.streaming.runtime.tasks.OneInputStreamTask

2016-01-13 Thread Prez Cannady
Simply passing FlinkUserCodeClassLoader.class.getClassLoader to the parent 
constructor cleared the impasse. 

2016-01-13 20:06:43.637  INFO 35403 --- [   main] 
o.o.e.j.s.SocketTextStreamWordCount$ : Started SocketTextStreamWordCount. 
in 5.176 seconds (JVM running for 12.58)
[INFO] 
[INFO] BUILD SUCCESS
[INFO] 
[INFO] Total time: 11.734 s
[INFO] Finished at: 2016-01-13T20:06:43-05:00
[INFO] Final Memory: 49M/4986M
[INFO] 
2016-01-13 20:06:43.804  INFO 35403 --- [   Thread-3] 
s.c.a.AnnotationConfigApplicationContext : Closing 
org.springframework.context.annotation.AnnotationConfigApplicationContext@33248c18:
 startup date [Wed Jan 13 20:06:38 EST 2016]; root of context hierarchy
2016-01-13 20:06:43.806  INFO 35403 --- [   Thread-3] 
o.s.j.e.a.AnnotationMBeanExporter: Unregistering JMX-exposed beans on 
shutdown


All tests in flink-runtime passed after the change `BlobLibraryCacheManager’, 
but I haven’t run the full test suite. 

Is this actually an appropriate fix, or just a way to highlight a configuration 
problem? 

I assume that injecting a parent class loader when registering a task might 
break things, but I don’t know nearly enough about Flink and this code to say 
one way or another.



Prez Cannady  
p: 617 500 3378  
e: revp...@opencorrelate.org   
GH: https://github.com/opencorrelate   
LI: https://www.linkedin.com/in/revprez   









> On Jan 13, 2016, at 6:50 PM, Stephan Ewen  wrote:
> 
> Hi!
> 
> Running this is Spring, the whole classloader configuration is probably a bit 
> different than in Flink's standalone or YARN or local mode.
> 
> Can you try if the following solves your problem: 
> 
> At the end of the file "BlobLibraryCacheManager", there is the private class 
> "FlinkUserCodeClassloader".
> 
> Can you replace the current FlinkUserCodeClassloader with this?
> 
> 
> private static class FlinkUserCodeClassLoader extends URLClassLoader {
> 
>   public FlinkUserCodeClassLoader(URL[] urls) {
>   super(urls, FlinkUserCodeClassLoader.class.getClassLoader());
>   }
> }
> 
> You can also try and use instead of 
> "FlinkUserCodeClassLoader.class.getClassLoader()" the statements 
> "Thread.currentThread().getContextClassLoader()".
> 
> Let me know if one of the two solves the problem.
> 
> Greetings,
> Stephan
> 
> 
> On Wed, Jan 13, 2016 at 7:20 PM, Prez Cannady  > wrote:
> I’m experimenting combining Spring with Flink.  I’ve successfully 
> instrumented for Gradle, but Maven is emitting ClassNotFoundExceptions for 
> items ostensibly on the class path.
> 
> Project is currently configured for:
> 
> 1. Scala 2.10.4
> 2. Flink 0.9.1
> 
> I execute the following
> 
> ```
> # In one terminal
> $ nc -lk -p  --sh-exec "cat /usr/share/dict/words | head -n 10”
> 
> 
> # In another terminal
> $ mvn clean install spring-boot:run -Drun.arguments=“localhost,”
> 
> # observe output
> ```
> 
> The specific class not found is 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.  However, Spring 
> Boot Plugin is configured to repackage a fat jar, and I can see that the 
> class is present in the included flink-streaming-core jar.  Additionally, 
> LogBack shows that the flink-streaming-core jar is in my classpath.
> 
> 
> I’m hoping I’m just missing something that should be obvious.  While I wish 
> could move forward with just Gradle, unfortunately I have to support Maven 
> builds.
> 
> For reference, the complete project is available here:
> 
> https://github.com/OCExercise/wordcount-processing 
> 
> 
> Additionally
> 
> 1. pom.xml 
> (https://github.com/OCExercise/wordcount-processing/blob/master/pom.xml 
> )
> 2. build.grade 
> (https://github.com/OCExercise/wordcount-processing/blob/master/build.gradle 
> )
> 3. Gist containing the full exception 
> (https://gist.github.com/revprez/2c1fb01c40e5d6790247 
> )
> 
> Prez Cannady  
> p: 617 500 3378  
> e: revp...@opencorrelate.org   
> GH: https://github.com/opencorrelate   
> LI: https://www.linkedin.com/in/revprez  
>  
> 
> 



Re: DeserializationSchema isEndOfStream usage?

2016-01-13 Thread David Kim
Thanks Robert! I'll be keeping tabs on the PR.

Cheers,
David

On Mon, Jan 11, 2016 at 4:04 PM, Robert Metzger  wrote:

> Hi David,
>
> In theory isEndOfStream() is absolutely the right way to go for stopping
> data sources in Flink.
> That its not working as expected is a bug. I have a pending pull request
> for adding a Kafka 0.9 connector, which fixes this issue as well (for all
> supported Kafka versions).
>
> Sorry for the inconvenience. If you want, you can check out the branch of
> the PR and build Flink yourself to get the fix.
> I hope that I can merge the connector to master this week, then, the fix
> will be available in 1.0-SNAPSHOT as well.
>
> Regards,
> Robert
>
>
>
> Sent from my iPhone
>
> On 11.01.2016, at 21:39, David Kim 
> wrote:
>
> Hello all,
>
> I saw that DeserializationSchema has an API "isEndOfStream()".
>
>
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
>
> Can *isEndOfStream* be utilized to somehow terminate a streaming flink
> job?
>
> I was under the impression that if we return "true" we can control when a
> stream can close. The use case I had in mind was controlling when
> unit/integration tests would terminate a flink job. We can rely on the fact
> that a test/spec would know how many items it expects to consume and then
> switch *isEndOfStream* to return true.
>
> Am I misunderstanding the intention for *isEndOfStream*?
>
> I also set a breakpoint on *isEndOfStream* and saw that it never was hit
> when using "FlinkKafkaConsumer082" to pass in a DeserializationSchema
> implementation.
>
> Currently testing on 1.0-SNAPSHOT.
>
> Cheers!
> David
>
>


-- 
Note: this information is confidential. It is prohibited to share, post
online or otherwise publicize without Braintree's prior written consent.


Re: Flink message & state lifecycle.

2016-01-13 Thread Aljoscha Krettek
Hi,
the window contents are stored in state managed by the window operator at all 
times until they are purged by a Trigger returning PURGE from one of its on*() 
methods.

Out of the box, Flink does not have something akin to the lateness and cleanup 
of Google Dataflow. You can, however implement it yourself using a custom 
Trigger. This is an example that mimics Google Dataflow:

public class EventTimeTrigger implements Trigger {
   private static final long serialVersionUID = 1L;

   private final boolean accumulating;
   private final long allowedLateness;

   private EventTimeTrigger(boolean accumulating, long allowedLateness) {
  this.accumulating = accumulating;
  this.allowedLateness = allowedLateness;
   }

   @Override
   public TriggerResult onElement(Object element, long timestamp, TimeWindow 
window, TriggerContext ctx) throws Exception {
  ctx.registerEventTimeTimer(window.maxTimestamp());
  return TriggerResult.CONTINUE;
   }

   @Override
   public TriggerResult onEventTime(long time, TimeWindow window, 
TriggerContext ctx) {
  if (time == window.maxTimestamp()) {
 if (accumulating) {
// register the cleanup timer if we are accumulating (and allow 
lateness)
if (allowedLateness > 0) {
   ctx.registerEventTimeTimer(window.maxTimestamp() + 
allowedLateness);
}
return TriggerResult.FIRE;
 } else {
return TriggerResult.FIRE_AND_PURGE;
 }
  } else if (time == window.maxTimestamp() + allowedLateness) {
 return TriggerResult.PURGE;
  }

  return TriggerResult.CONTINUE;
   }

   @Override
   public TriggerResult onProcessingTime(long time, TimeWindow window, 
TriggerContext ctx) throws Exception {
  return TriggerResult.CONTINUE;
   }

   @Override
   public String toString() {
  return "EventTimeTrigger()";
   }

   /**
* Creates an event-time trigger that fires once the watermark passes the 
end of the window.
*
* 
* Once the trigger fires all elements are discarded. Elements that arrive 
late immediately
* trigger window evaluation with just this one element.
*/
   public static EventTimeTrigger discarding() {
  return new EventTimeTrigger(false, 0L);
   }

   /**
* Creates an event-time trigger that fires once the watermark passes the 
end of the window.
*
* 
* This trigger will not immediately discard all elements once it fires. 
Only after the
* watermark passes the specified lateness are the window elements 
discarded, without
* emitting a new result. If a late element arrives within the specified 
lateness
* the window is computed again and a new result is emitted.
*/
   public static EventTimeTrigger accumulating(AbstractTime allowedLateness) {
  return new EventTimeTrigger(true, allowedLateness.toMilliseconds());
   }
}

You can specify a lateness and while that time is not yet reached the windows 
will remain and late arriving elements will trigger window emission with the 
complete window contents.

Cheers,
Aljoscha
> On 13 Jan 2016, at 15:12, Andrew Coates  wrote:
> 
> Hi, 
> 
> I'm trying to understand how the lifecycle of messages / state is managed by 
> Flink, but I'm failing to find any documentation.
> 
> Specially, if I'm using a windowed stream and a type of trigger that retain 
> the elements of the window to allow for processing of late data e.g. 
> ContinousEventTimeTrigger, then where are the contents of the windows, or 
> their intermediate computation results, stored, and when is the data removed?
> 
> I'm thinking in terms of Google's Dataflow API, setting a windows the 
> withAllowedLateness option allows the caller to control how long past the end 
> of a window the data should be maintained.  Does Flink have anything similar?
> 
> Thanks,
> 
> Andy



DataStream jdbc sink

2016-01-13 Thread Traku traku
Hi everyone.

I'm trying to migrate some code to flink 0.10 and I'm having a problem.

I try to create a custom sink to insert the data to a postgresql database.
My code was this.

var.output(
// build and configure OutputFormat
JDBCOutputFormat
.buildJDBCOutputFormat()
.setDrivername("org.postgresql.Driver")
.setDBUrl("jdbc:postgresql://127.0.0.1:5432/test")
.setUsername("postgres")
.setPassword("")
.setQuery("insert into XXX  values  (?,?,?);")
.finish()
);

Could you help me?

Best regards.


Re: Flink DataStream and KeyBy

2016-01-13 Thread Tzu-Li (Gordon) Tai
Hi Saiph,

In Flink, the key for keyBy() can be provided in different ways:
https://ci.apache.org/projects/flink/flink-docs-master/apis/programming_guide.html#specifying-keys
(the doc is for DataSet API, but specifying keys is basically the same for
DataStream and DataSet).

As described in the documentation, calls like keyBy(0) are meant for Tuples,
so it only works for DataStream[Tuple]. Other key definition types like
keyBy(new KeySelector() {...}) can basically take any DataStream of
arbitrary data type. Flink finds out whether or not there is a conflict
between the type of the data in the DataStream and the way the key is
defined at runtime.

Hope this helps!

Cheers,
Gordon





--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-DataStream-and-KeyBy-tp4271p4272.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Flink DataStream and KeyBy

2016-01-13 Thread Saiph Kappa
Hi,

This line «stream.keyBy(0)» only works if stream is of type
DataStream[Tuple] - and this Tuple is not a scala tuple but a flink tuple
(why not to use scala Tuple?). Currently keyBy can be applied to anything
(at least in scala) like DataStream[String] and
DataStream[Array[String]].

Can anyone confirm me this?

Thanks.