By default, flushOnCheckpoint is set to True.
So ideally, based on env.enableCheckpointing(300000); ---- the flush to ES
must be triggered every 30seconds, though our ES Flush timeout is 60
seconds.
If the above assumption is correct, then still we do not see packets
getting flushed till the next packet/batch arrives.

Thanks.

On Fri, Jun 21, 2019 at 6:07 PM Ramya Ramamurthy <hair...@gmail.com> wrote:

> Yes, we do maintain checkpoints
> env.enableCheckpointing(300000);
>
> But we assumed it is for Kafka consumer offsets. Not sure how this is
> useful in this case? Can you pls. elaborate on this.
>
> ~Ramya.
>
>
>
> On Fri, Jun 21, 2019 at 4:33 PM miki haiat <miko5...@gmail.com> wrote:
>
>> Did you set some checkpoints  configuration?
>>
>> On Fri, Jun 21, 2019, 13:17 Ramya Ramamurthy <hair...@gmail.com> wrote:
>>
>> > Hi,
>> >
>> > We use Kafka->Flink->Elasticsearch in our project.
>> > The data to the elasticsearch is not getting flushed, till the next
>> batch
>> > arrives.
>> > E.g.: If the first batch contains 1000 packets, this gets pushed to the
>> > Elastic, only after the next batch arrives [irrespective of reaching the
>> > batch time limit].
>> > Below are the sink configurations we use  currently.
>> >
>> > esSinkBuilder.setBulkFlushMaxActions(2000); // 2K records
>> > esSinkBuilder.setBulkFlushMaxSizeMb(5); // 5 Mb once
>> > esSinkBuilder.setBulkFlushInterval(60000); // 1 minute once
>> > esSinkBuilder.setBulkFlushBackoffRetries(3); // Retry three times if
>> bulk
>> > fails
>> > esSinkBuilder.setBulkFlushBackoffDelay(5000); // Retry after 5 seconds
>> > esSinkBuilder.setBulkFlushBackoff(true);
>> >
>> > Sink code :
>> > List<HttpHost> httpHosts = new ArrayList<>();
>> > //httpHosts.add(new HttpHost("10.128.0.34", 9200, "http"));
>> > httpHosts.add(new HttpHost("192.168.80.171", 9200, "http"));
>> >
>> > ElasticsearchSink.Builder<Row> esSinkBuilder = new
>> > ElasticsearchSink.Builder<>(
>> > httpHosts,
>> > new ElasticsearchSinkFunction<Row>() {
>> >
>> > private IndexRequest createIndexRequest(byte[] document, String
>> indexDate)
>> > {
>> >
>> > return new IndexRequest(esIndex + indexDate, esType)
>> > .source(document, XContentType.JSON);
>> >
>> > }
>> >
>> > @Override
>> > public void process(Row r, RuntimeContext runtimeContext, RequestIndexer
>> > requestIndexer) {
>> > byte[] byteArray = serializationSchema.serialize(r);
>> >
>> > ObjectMapper mapper = new ObjectMapper();
>> > ObjectWriter writer = mapper.writer();
>> >
>> > try {
>> > JsonNode jsonNode = mapper.readTree(byteArray);
>> >
>> > long tumbleStart = jsonNode.get("fseen").asLong();
>> >
>> > ZonedDateTime utc =
>> > Instant.ofEpochMilli(tumbleStart).atZone(ZoneOffset.UTC);
>> > String indexDate =
>> DateTimeFormatter.ofPattern("yyyy.MM.dd").format(utc);
>> >
>> > byte[] document = writer.writeValueAsBytes(jsonNode);
>> >
>> > requestIndexer.add(createIndexRequest(document, indexDate));
>> > } catch (Exception e) {
>> > System.out.println("In the error block");
>> > }
>> >
>> > }
>> > }
>> > );
>> >
>> > Has anyone faced this issue? Any help would be appreciated !!
>> >
>> > Thanks,
>> >
>>
>

Reply via email to