[ 
https://issues.apache.org/jira/browse/FLINK-8272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16294165#comment-16294165
 ] 

Juan Miguel Cejuela commented on FLINK-8272:
--------------------------------------------

Correction: 

* I did see the log message: "Started Queryable State Proxy Server" (without 
the "the")
* Inspecting that log, I realized I had wrongly configured the port for the 
queryable client (if I recall correctly, in 1.3.2 the port was the same as 
flink's JobManager, but now the port must be of the "Queryable State Proxy 
Server" and the host must be one of the TaskManager's)

Bottom line: my bad, no issue! You can close this issue. Sorry!

> 1.4 & Scala: Exception when querying the state
> ----------------------------------------------
>
>                 Key: FLINK-8272
>                 URL: https://issues.apache.org/jira/browse/FLINK-8272
>             Project: Flink
>          Issue Type: Bug
>          Components: Queryable State
>    Affects Versions: 1.4.0
>         Environment: mac
>            Reporter: Juan Miguel Cejuela
>
> With Flink 1.3.2 and basically the same code except for the changes in the 
> API that happened in 1.4.0, I could query the state finely.
> With 1.4.0 (& scala), I get the exception written below. Most important line: 
> {{java.util.concurrent.CompletionException: 
> java.lang.IndexOutOfBoundsException: readerIndex(0) + length(4) exceeds 
> writerIndex(0): PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 0)}}
> Somehow it seems that the connection to the underlying buffer fails. Perhaps 
> most informing is that, if I give a wrong {{JobId}} (i.e. one that is not 
> running), I still get the same error. So I guess no connection succeeds. 
> Also, to avoid possible serialization errors, I am just querying with a key 
> of type {{String}} and, for testing, a dummy & trivial {{case class}}.
> I create the necessary types such as:
> {code}
> implicit val typeInformationString = createTypeInformation[String]
> implicit val typeInformationDummy = createTypeInformation[Dummy]
> implicit val valueStateDescriptorDummy =
>     new ValueStateDescriptor("state", typeInformationDummy)}}
> {code}
> And then query the like such as:
> {code}
> val javaCompletableFuture = client
>         .getKvState(this.flinkJobIdObject, this.queryableState, key, 
> typeInformationString, valueStateDescriptorDummy)
> {code}
> Does anybody have any clue on this? Let me know what other information I 
> should provide.
> ---
> Full stack trace:
> {code}
> java.util.concurrent.CompletionException: 
> java.lang.IndexOutOfBoundsException: readerIndex(0) + length(4) exceeds 
> writerIndex(0): PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 0)
>       at 
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>       at 
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>       at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
>       at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>       at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>       at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>       at 
> org.apache.flink.queryablestate.network.Client$PendingConnection.lambda$handInChannel$0(Client.java:289)
>       at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>       at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>       at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>       at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>       at 
> org.apache.flink.queryablestate.network.Client$EstablishedConnection.close(Client.java:432)
>       at 
> org.apache.flink.queryablestate.network.Client$EstablishedConnection.onFailure(Client.java:505)
>       at 
> org.apache.flink.queryablestate.network.ClientHandler.channelRead(ClientHandler.java:92)
>       at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>       at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>       at 
> org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
>       at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>       at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>       at 
> org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
>       at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
>       at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
>       at 
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
>       at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
>       at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>       at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>       at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>       at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>       at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.IndexOutOfBoundsException: readerIndex(0) + length(4) 
> exceeds writerIndex(0): PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 0)
>       at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1166)
>       at 
> org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.readInt(AbstractByteBuf.java:619)
>       at 
> org.apache.flink.queryablestate.network.messages.MessageSerializer.deserializeHeader(MessageSerializer.java:231)
>       at 
> org.apache.flink.queryablestate.network.ClientHandler.channelRead(ClientHandler.java:76)
>       ... 16 common frames omitted
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to