Hi,
I have written ETL jobs in Flink (DataSet API). When I execute them in IDE,
they run and finish fine. When I try to run them on my cluster, I get
"Insufficient number of network buffers" error.
I have 5 machines in my cluster with 4 cores each. TaskManager is given 3GB
each. I increased the
Looking over the code, I see that Flink creates a TimeWindow object each
time the WindowAssigner is created. I have not yet tested this, but I am
wondering if this can become problematic if you have a very long sliding
window with a small slide, such as a 24 hour window with a 1 minute slide.
It
Thanks for the suggestion. I ended up implementing it a different way.
What is needed is a mechanism to give each stream a different window
assigner, and then let Flink perform the join normally given the assigned
windows.
Specifically, for my use case what I need is a sliding window for one
Sorry for the previous incomplete email. Didn't realize I hit send!
I was facing a weird compilation error in Scala when I did
val joinedStream = stream1.connect(stream2)
.transform("funName", outTypeInfo, joinOperator)
It turned out to be due to a difference in API signature between Scala and
Hello,
I'm fac
val stream = env.addSource(new FlinkKafkaConsumer09[String]("test-topic",
new SimpleStringSchema(), properties))
val bidderStream: KeyedStream[BidderRawLogs, Int] = stream.flatMap(b =>
BidderRawLogs(b)).keyBy(b => b.strategyId)
val metaStrategy: KeyedStream[(Int, String), Int] =
1. why are you doing join instead of something like
System.currentTimeInMillis()? at the end you have tuple of your data with
timestamp anyways...so why just not to wrap you data in tuple2 with
additional info of creation ts?
2. are you sure that consumer/producer machines' clocks are in sync?
Hi Aljoscha,
Yes, there is still a high partition/window count since I have to keyby the
userid so that I get unique users. I believe what I see happening is that
the second window with the timeWindowAll is not getting all the results or
the results from the previous window are changing when the
Hi John,
S3 keys are configured via Hadoop's configuration files.
Check out the documentation for AWS setups [1].
Cheers, Fabian
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/aws.html
2016-05-02 20:22 GMT+02:00 John Sherwood :
> Hello all,
>
> I'm
Hello all,
I'm attempting to set up a taskmanager cluster using S3 as the
highly-available store. It looks like the main thing is just setting the `
state.backend.fs.checkpointdir` to the appropriate s3:// URI, but as
someone rather new to accessing S3 from Java, how should I provide Flink
with
Hi everyone,
I have implemented a way to measure latency in a DataStream (I hope): I'm
consuming a Kafka topic and I'm union'ing the resulting stream with a
custom source that emits a (machine-local) timestamp every 1000ms (using
currentTimeMillis). On the consuming end I'm distinguishing between
Hi Gyula,
Could you explain a bit why i wouldn't want the centroids to be collected
after every point?
I mean, once I get a streamed point via map1 function .. i would want to
compare the distance of the point with a centroid which arrives via map2
function and i keep on comparing for every
Hi,
as I understand it the order of elements will not be preserved across
iteration supersets. But maybe some-one else knows more.
Cheers,
Aljoscha
On Thu, 28 Apr 2016 at 00:23 David Kim
wrote:
> Hello all,
>
> I read the documentation at [1] on iterations and
Hi Henry,
yes, with early firings you would have the problem of duplicate emission.
I'm afraid I don't have a solution for that right now.
For the "another question" I think you are right that this would be session
windowing. Please have a look at this blog post that I wrote recently:
Hello all,
In DataSet *first(n)* function can be called to get 'n' no. of elements in
the DataSet, how could similar operations be done in DataStream to get 'n'
no. of elements from the current DataStream.
Best Regards,
Subash Basnet
Hi Ken,
When you're running Yarn, the Flink configuration is created once and
shared among all nodes (JobManager and TaskManagers). Please have a
look at the JobManager tab on the web interface. It shows you the
configuration.
Cheers,
Max
On Fri, Apr 29, 2016 at 3:18 PM, Ken Krugler
Hi Gyula,
I understand more now how this thing might work and its fascinating.
Although I still have one question with the coflatmap function.
First, let me explain what I understand and whether its correct or not:
1. The connected iterative stream ensures that the coflatmap function
receive
It solved my problem!
On Mon, May 2, 2016 at 3:45 PM, Fabian Hueske wrote:
> Grouping a grouped dataset is not supported.
> You can group on multiple keys: dataSet.groupBy(1,2).
>
> Can you describe your use case if that does not solve the problem?
>
>
>
> 2016-05-02 10:34
I think there is a problem with the interaction of legacy OutputFormats and
streaming programs. Flush is not called, the CsvOutputFormat only writes in
flush(), therefore we don't see any results.
On Mon, 2 May 2016 at 11:59 Fabian Hueske wrote:
> Have you checked the log
Grouping a grouped dataset is not supported.
You can group on multiple keys: dataSet.groupBy(1,2).
Can you describe your use case if that does not solve the problem?
2016-05-02 10:34 GMT+02:00 Punit Naik :
> Hello
>
> I wanted to perform a groupBy on an already grouped
Yes, it looks like the connector only creates the connection once when it
starts and fails if the host is no longer reachable.
It should be possible to catch that failure and try to re-open the
connection.
I opened a JIRA for this issue (FLINK-3857).
Would you like to implement the improvement?
Have you checked the log files as well?
2016-05-01 14:07 GMT+02:00 subash basnet :
> Hello there,
>
> If anyone could help me know why the below *result* DataStream get's
> written as text, but not as csv?. As it's in a tuple format I guess it
> should be the same for both
Hi Nirmalya,
the solution with List.size() won't use a combiner and won't be efficient
for large data sets with large groups.
I would recommend to add a 1 and use GroupedDataSet.sum().
2016-05-01 12:48 GMT+02:00 nsengupta :
> Hello all,
>
> This is how I have moved
The slot configuration should depend on the complexity of jobs.
Since each slot runs a "slice" of a program, one slot might potentially
execute many concurrent tasks.
For complex jobs you should allocate more than one core for each slot.
2016-05-02 10:12 GMT+02:00 Robert Metzger
Hi Aljosha
Thanks for your answer!
I tried using returns but it does not work since the only place where I
could call it is within the function that has all the generic types so
there is no useful type hint to give. I could make the user hand over the
class definition for the type as well but
Hi,
for user functions that have generics, such as you have, you have to
manually specify the types somehow. This can either be done using
InputTypeConfigurable/OutputTypeConfigurable or maybe using
stream.returns().
Cheers,
Aljoscha
On Fri, 29 Apr 2016 at 12:25 Martin Neumann
Hello
I wanted to perform a groupBy on an already grouped dataset. How do I do
this?
--
Thank You
Regards
Punit Naik
Hi,
what do you mean by "still experiencing the same issues"? Is the key count
still very hight, i.e. 500k windows?
For the watermark generation, specifying a lag of 2 days is very
conservative. If the watermark is this conservative I guess there will
never arrive elements that are behind the
Hi,
I'm sorry for the inconvenience, for the -SNAPSHOT release versions one
must also append the address of the repository to the command, like this:
$ mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-scala \
Hi,
When using Elasticsearch connector, Is there a way to reflect IP change of
Elasticsearch cluster?
We use DNS of Elasticsearch in data sink, e.g. elasticsearch-dev.foo.de.
However, when we replace the old Elasticsearch cluster with a new one, the
Elasticsearch connector cannot write into the
I have a Dataset which contains only strings. But when I execute a
writeAsText and supply a folder inside the string, it finishes with the
following output but does not write any text files:
org.apache.flink.api.java.operators.DataSink[String] = DataSink ''
(TextOutputFormat
30 matches
Mail list logo