[
https://issues.apache.org/jira/browse/FLINK-38294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Chu Xue updated FLINK-38294:
----------------------------
Description:
The Hbase connector will not flush when write less than
1000(sink.buffer-flush.max-rows) records or execute in less than
1s(sink.buffer-flush.interval).Org.apache.hadoop.hbase.client.BufferedMutator
excute flush() when close() and flush() maybe failed.
{code:java}
if (bufferFlushIntervalMillis > 0 && bufferFlushMaxMutations != 1) {
this.executor =
Executors.newScheduledThreadPool(
1, new ExecutorThreadFactory("hbase-upsert-sink-flusher"));
this.scheduledFuture =
this.executor.scheduleWithFixedDelay(
() -> {
if (closed) {
return;
}
try {
flush();
} catch (Exception e) {
// fail the sink and skip the rest of the items
// if the failure handler decides to throw an
exception
failureThrowable.compareAndSet(null, e);
}
},
bufferFlushIntervalMillis,
bufferFlushIntervalMillis,
TimeUnit.MILLISECONDS);
} {code}
{code:java}
@SuppressWarnings("rawtypes")
@Overridepublic
void invoke(T value, Context context) throws Exception {
checkErrorAndRethrow();
mutator.mutate(mutationConverter.convertToMutation(value));
//flush when the buffer number of mutations greater than the configured
max size.
if (bufferFlushMaxMutations > 0
&& numPendingRequests.incrementAndGet() >=
bufferFlushMaxMutations) {
flush();
} else if (bufferFlushMaxMutations == 0 && bufferFlushMaxSizeInBytes ==
0) {
flush();
}}
{code}
{code:java}
@Override
public void close() throws Exception {
closed = true;
if (mutator != null) {
try {
mutator.close();
} catch (IOException e) {
LOG.warn("Exception occurs while closing HBase
BufferedMutator.", e);
}
this.mutator = null;
}
if (connection != null) {
try {
connection.close();
} catch (IOException e) {
LOG.warn("Exception occurs while closing HBase
Connection.", e);
}
this.connection = null;
}
if (scheduledFuture != null) {
scheduledFuture.cancel(false);
if (executor != null) {
executor.shutdownNow();
}
}} {code}
For example, creating a permission denial case where the user does not have
permission(ranger) for the hbase namespace.The task will failed ,but return
success.
[^jobmanager.log]
[^taskmanager.log]
Modify org.apache.flink.connector.hbase.sink.HBaseSinkFunction#close like this,
throw the error.
{code:java}
@Override
public void close() throws Exception {
closed = true;
if (mutator != null) {
try {
mutator.close();
} catch (IOException e) {
LOG.warn("Exception occurs while closing HBase
BufferedMutator.", e);
}
this.mutator = null;
}
if (connection != null) {
try {
connection.close();
} catch (IOException e) {
LOG.warn("Exception occurs while closing HBase
Connection.", e);
}
this.connection = null;
}
if (scheduledFuture != null) {
scheduledFuture.cancel(false);
if (executor != null) {
executor.shutdownNow();
}
}
//add check
checkErrorAndRethrow();
} {code}
was:
The Hbase connector will not flush when write less than
1000(sink.buffer-flush.max-rows) records or execute in less than
1s(sink.buffer-flush.interval).Org.apache.hadoop.hbase.client.BufferedMutator
excute flush() when close() and flush() maybe failed.
{code:java}
if (bufferFlushIntervalMillis > 0 && bufferFlushMaxMutations != 1) {
this.executor =
Executors.newScheduledThreadPool(
1, new ExecutorThreadFactory("hbase-upsert-sink-flusher"));
this.scheduledFuture =
this.executor.scheduleWithFixedDelay(
() -> {
if (closed) {
return;
}
try {
flush();
} catch (Exception e) {
// fail the sink and skip the rest of the items
// if the failure handler decides to throw an
exception
failureThrowable.compareAndSet(null, e);
}
},
bufferFlushIntervalMillis,
bufferFlushIntervalMillis,
TimeUnit.MILLISECONDS);
} {code}
{code:java}
@SuppressWarnings("rawtypes")
@Overridepublic
void invoke(T value, Context context) throws Exception {
checkErrorAndRethrow();
mutator.mutate(mutationConverter.convertToMutation(value));
//flush when the buffer number of mutations greater than the configured
max size.
if (bufferFlushMaxMutations > 0
&& numPendingRequests.incrementAndGet() >=
bufferFlushMaxMutations) {
flush();
} else if (bufferFlushMaxMutations == 0 && bufferFlushMaxSizeInBytes ==
0) { flush();
}}
{code}
{code:java}
@Override
public void close() throws Exception {
closed = true;
if (mutator != null) {
try {
mutator.close();
} catch (IOException e) {
LOG.warn("Exception occurs while closing HBase
BufferedMutator.", e);
}
this.mutator = null;
}
if (connection != null) {
try {
connection.close();
} catch (IOException e) {
LOG.warn("Exception occurs while closing HBase
Connection.", e);
}
this.connection = null;
}
if (scheduledFuture != null) {
scheduledFuture.cancel(false);
if (executor != null) {
executor.shutdownNow();
}
}} {code}
For example, creating a permission denial case where the user does not have
permission(ranger) for the hbase namespace.The task will failed ,but return
success.
[^jobmanager.log]
[^taskmanager.log]
Modify org.apache.flink.connector.hbase.sink.HBaseSinkFunction#close like this,
throw the error.
{code:java}
@Override
public void close() throws Exception {
closed = true;
if (mutator != null) {
try {
mutator.close();
} catch (IOException e) {
LOG.warn("Exception occurs while closing HBase
BufferedMutator.", e);
}
this.mutator = null;
}
if (connection != null) {
try {
connection.close();
} catch (IOException e) {
LOG.warn("Exception occurs while closing HBase
Connection.", e);
}
this.connection = null;
}
if (scheduledFuture != null) {
scheduledFuture.cancel(false);
if (executor != null) {
executor.shutdownNow();
}
}
//add check
checkErrorAndRethrow();
} {code}
> Hbase connector misclassifies failed task as successful
> -------------------------------------------------------
>
> Key: FLINK-38294
> URL: https://issues.apache.org/jira/browse/FLINK-38294
> Project: Flink
> Issue Type: Bug
> Components: Connectors / HBase
> Affects Versions: 1.18.1, 1.19.3, 1.20.2
> Reporter: Chu Xue
> Priority: Major
> Attachments: jobmanager.log, taskmanager.log
>
>
> The Hbase connector will not flush when write less than
> 1000(sink.buffer-flush.max-rows) records or execute in less than
> 1s(sink.buffer-flush.interval).Org.apache.hadoop.hbase.client.BufferedMutator
> excute flush() when close() and flush() maybe failed.
> {code:java}
> if (bufferFlushIntervalMillis > 0 && bufferFlushMaxMutations != 1) {
> this.executor =
> Executors.newScheduledThreadPool(
> 1, new
> ExecutorThreadFactory("hbase-upsert-sink-flusher"));
> this.scheduledFuture =
> this.executor.scheduleWithFixedDelay(
> () -> {
> if (closed) {
> return;
> }
> try {
> flush();
> } catch (Exception e) {
> // fail the sink and skip the rest of the items
> // if the failure handler decides to throw an
> exception
> failureThrowable.compareAndSet(null, e);
> }
> },
> bufferFlushIntervalMillis,
> bufferFlushIntervalMillis,
> TimeUnit.MILLISECONDS);
> } {code}
> {code:java}
> @SuppressWarnings("rawtypes")
> @Overridepublic
> void invoke(T value, Context context) throws Exception {
> checkErrorAndRethrow();
> mutator.mutate(mutationConverter.convertToMutation(value));
> //flush when the buffer number of mutations greater than the configured
> max size.
> if (bufferFlushMaxMutations > 0
> && numPendingRequests.incrementAndGet() >=
> bufferFlushMaxMutations) {
> flush();
> } else if (bufferFlushMaxMutations == 0 && bufferFlushMaxSizeInBytes ==
> 0) {
> flush();
> }}
> {code}
> {code:java}
> @Override
> public void close() throws Exception {
> closed = true;
> if (mutator != null) {
> try {
> mutator.close();
> } catch (IOException e) {
> LOG.warn("Exception occurs while closing HBase
> BufferedMutator.", e);
> }
> this.mutator = null;
> }
> if (connection != null) {
> try {
> connection.close();
> } catch (IOException e) {
> LOG.warn("Exception occurs while closing HBase
> Connection.", e);
> }
> this.connection = null;
> }
> if (scheduledFuture != null) {
> scheduledFuture.cancel(false);
> if (executor != null) {
> executor.shutdownNow();
> }
> }} {code}
> For example, creating a permission denial case where the user does not have
> permission(ranger) for the hbase namespace.The task will failed ,but return
> success.
> [^jobmanager.log]
> [^taskmanager.log]
>
> Modify org.apache.flink.connector.hbase.sink.HBaseSinkFunction#close like
> this, throw the error.
> {code:java}
> @Override
> public void close() throws Exception {
> closed = true;
> if (mutator != null) {
> try {
> mutator.close();
> } catch (IOException e) {
> LOG.warn("Exception occurs while closing HBase
> BufferedMutator.", e);
> }
> this.mutator = null;
> }
> if (connection != null) {
> try {
> connection.close();
> } catch (IOException e) {
> LOG.warn("Exception occurs while closing HBase
> Connection.", e);
> }
> this.connection = null;
> }
> if (scheduledFuture != null) {
> scheduledFuture.cancel(false);
> if (executor != null) {
> executor.shutdownNow();
> }
> }
> //add check
> checkErrorAndRethrow();
> } {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)