[GH] (incubator-fury): Workflow run "Fury CI" failed!
The GitHub Actions job "Fury CI" on incubator-fury.git has failed. Run started by GitHub user chaokunyang (triggered by chaokunyang). Head commit for run: e834b2b2f1d86f305fc2db1857bbb13ede7773bd / chaokunyang fix java8 utf-16 string serialization Report URL: https://github.com/apache/incubator-fury/actions/runs/8626510470 With regards, GitHub Actions via GitBox - To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org
[GH] (incubator-fury): Workflow run "Fury CI" failed!
The GitHub Actions job "Fury CI" on incubator-fury.git has failed. Run started by GitHub user chaokunyang (triggered by chaokunyang). Head commit for run: 20223f2366f85e59fb11dd2cc323e33a594bbe53 / chaokunyang fix str tests Report URL: https://github.com/apache/incubator-fury/actions/runs/8626359780 With regards, GitHub Actions via GitBox - To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org
[GH] (incubator-fury): Workflow run "Fury CI" failed!
The GitHub Actions job "Fury CI" on incubator-fury.git has failed. Run started by GitHub user chaokunyang (triggered by chaokunyang). Head commit for run: 60be83a6bc74993cc23e1b0dabe01b0bf92f7afb / chaokunyang use fury_serialize Report URL: https://github.com/apache/incubator-fury/actions/runs/8625806181 With regards, GitHub Actions via GitBox - To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org
[GH] (incubator-fury): Workflow run "Fury CI" failed!
The GitHub Actions job "Fury CI" on incubator-fury.git has failed. Run started by GitHub user chaokunyang (triggered by chaokunyang). Head commit for run: 16d6e8a63c1c092bd5a3dba444c746e2edfae552 / chaokunyang refactor string read by merge coder with size Report URL: https://github.com/apache/incubator-fury/actions/runs/8625792157 With regards, GitHub Actions via GitBox - To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org
[PR] feat(java): optimize string serialization by concating coder and length [incubator-fury]
chaokunyang opened a new pull request, #1486: URL: https://github.com/apache/incubator-fury/pull/1486 ## What does this PR do? ## Related issues ## Does this PR introduce any user-facing change? - [ ] Does this PR introduce any public API change? - [ ] Does this PR introduce any binary protocol compatibility change? ## Benchmark -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org
[GH] (incubator-fury): Workflow run "Fury CI" is working again!
The GitHub Actions job "Fury CI" on incubator-fury.git has succeeded. Run started by GitHub user chaokunyang (triggered by chaokunyang). Head commit for run: b26288df96f577a976ea1c2526c018cf4fadb7cf / Shawn Yang Merge branch 'main' into channel_stream_reader Report URL: https://github.com/apache/incubator-fury/actions/runs/8625078112 With regards, GitHub Actions via GitBox - To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org
[GH] (incubator-fury): Workflow run "Fury CI" failed!
The GitHub Actions job "Fury CI" on incubator-fury.git has failed. Run started by GitHub user chaokunyang (triggered by chaokunyang). Head commit for run: b26288df96f577a976ea1c2526c018cf4fadb7cf / Shawn Yang Merge branch 'main' into channel_stream_reader Report URL: https://github.com/apache/incubator-fury/actions/runs/8625078112 With regards, GitHub Actions via GitBox - To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org
Re: [PR] feat(java): implement fury logger and remove slf4j library [incubator-fury]
chaokunyang commented on PR #1485: URL: https://github.com/apache/incubator-fury/pull/1485#issuecomment-2046382532 > > But I agree with you that we can provide log level for reduce info and other logs print. > > Yes, we can control the output of other levels of Log. > > > The log level api has its cost too. Jvm can't eliminate the condition checks, you can see how kryo use a static final constant to reduce the log cost > > FuryLogger is only used internally. We can add a `static final` Level field in FuryLogger to control the output Log level. In this way, Log Level does not support dynamic setting. Nice, looks good to me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org
Re: [PR] chore: rename unsigned varint/varlong method name [incubator-fury]
chaokunyang commented on PR #1480: URL: https://github.com/apache/incubator-fury/pull/1480#issuecomment-2046355891 Hi @tommyettinger, do you have any suggestions? You used `MemoryBuffer` widely inhttps://github.com/tommyettinger/tantrum. Will it be better if we rename all `read/write/get/put Int/Long/Float/Double` in methods of `MemoryBuffer` to `read/write/get/put Int32/Int64/Float32/Float64` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org
Re: [PR] feat(java): implement fury logger and remove slf4j library [incubator-fury]
LiangliangSui commented on PR #1485: URL: https://github.com/apache/incubator-fury/pull/1485#issuecomment-204561 > But I agree with you that we can provide log level for reduce info and other logs print. Yes, we can control the output of other levels of Log. > The log level api has its cost too. Jvm can't eliminate the condition checks, you can see how kryo use a static final constant to reduce the log cost FuryLogger is only used internally. We can add a `static final` Level field in FuryLogger to control the output Log level. In this way, Log Level does not support dynamic setting, which saves the overhead of JVM condition checking. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org
Re: [PR] feat(java): implement fury logger and remove slf4j library [incubator-fury]
chaokunyang commented on PR #1485: URL: https://github.com/apache/incubator-fury/pull/1485#issuecomment-2045512782 > We cannot guarantee that `Log.debug` will not be used. If `Log.debug` is used in some frequently executed locations, a large amount of logs will be output because we have no level control. > > Maybe we can open an Issue and add the `good first use` tag to provide training opportunities for new developers who want to join the Fury community. This need is not very urgent for us. The debug log should be avoided in critical path. But I agree with you that we can provide log level for reduce info and other logs print. I prefer not to provide debug api in fury Logger. Without this API, users won't print debug log. The log level api has its cost too. Jvm can't eliminate the condition checks, you can see how kryo use a static final constant to reduce the log cost -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org
Re: [PR] feat(java): implement fury logger and remove slf4j library [incubator-fury]
LiangliangSui commented on PR #1485: URL: https://github.com/apache/incubator-fury/pull/1485#issuecomment-2045412314 We cannot guarantee that `Log.debug` will not be used. If `Log.debug` is used in some frequently executed locations, a large amount of logs will be output because we have no level control. Maybe we can open an Issue and add the `good first use` tag to provide training opportunities for new developers who want to join the Fury community. This need is not very urgent for us. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org
Re: [PR] feat(java): implement fury logger and remove slf4j library [incubator-fury]
chaokunyang commented on code in PR #1485: URL: https://github.com/apache/incubator-fury/pull/1485#discussion_r1557720365 ## java/fury-core/src/main/java/org/apache/fury/logging/FuryLogger.java: ## @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.fury.logging; + +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; + +public class FuryLogger implements Logger { + private static final DateTimeFormatter dateTimeFormatter = + DateTimeFormatter.ofPattern("-MM-dd hh:mm:ss"); + private final String name; + + public FuryLogger(Class targetClass) { +this.name = targetClass.getSimpleName(); + } + + // The implementation should not forward to other method, otherwise the fileNumber won't be right. + @Override + public void info(String msg) { +log("INFO", msg, new Object[0], false); + } + + @Override + public void info(String msg, Object arg) { +log("INFO", msg, new Object[] {arg}, false); + } + + @Override + public void info(String msg, Object arg1, Object arg2) { +log("INFO", msg, new Object[] {arg1, arg2}, false); + } + + @Override + public void info(String msg, Object... args) { +log("INFO", msg, args, false); + } + + @Override + public void warn(String msg) { +log("WARN", msg, new Object[0], false); + } + + @Override + public void warn(String msg, Object arg) { +log("WARN", msg, new Object[] {arg}, true); + } + + @Override + public void warn(String msg, Object arg1, Object arg2) { +log("WARN", msg, new Object[] {arg1, arg2}, true); + } + + @Override + public void warn(String msg, Object... args) { +log("WARN", msg, args, true); + } + + @Override + public void warn(String msg, Throwable t) { +log("WARN", msg, new Object[] {t}, true); + } + + @Override + public void error(String msg) { +log("ERROR", msg, new Object[0], false); + } + + @Override + public void error(String msg, Object arg) { +log("ERROR", msg, new Object[] {arg}, true); + } + + @Override + public void error(String msg, Object arg1, Object arg2) { +log("ERROR", msg, new Object[] {arg1, arg2}, true); + } + + @Override + public void error(String msg, Object... args) { +log("ERROR", msg, args, true); + } + + @Override + public void error(String msg, Throwable t) { +log("ERROR", msg, new Object[] {t}, true); + } + + private void log(String level, String msg, Object[] args, boolean mayPrintTrace) { Review Comment: Good point -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org
Re: [PR] feat(java): implement fury logger and remove slf4j library [incubator-fury]
chaokunyang commented on PR #1485: URL: https://github.com/apache/incubator-fury/pull/1485#issuecomment-2045293452 > Don't we need to control the log level of printing? I see that all current levels of logs can be printed. This is for Fury internal use. If users need such control, he can use slf4j. Such support in Fury is better but not a strong requirement. Maybe we can support it in future -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org
Re: [PR] feat(java): implement fury logger and remove slf4j library [incubator-fury]
LiangliangSui commented on code in PR #1485: URL: https://github.com/apache/incubator-fury/pull/1485#discussion_r1557659445 ## java/fury-core/src/main/java/org/apache/fury/logging/FuryLogger.java: ## @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.fury.logging; + +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; + +public class FuryLogger implements Logger { + private static final DateTimeFormatter dateTimeFormatter = + DateTimeFormatter.ofPattern("-MM-dd hh:mm:ss"); + private final String name; + + public FuryLogger(Class targetClass) { +this.name = targetClass.getSimpleName(); + } + + // The implementation should not forward to other method, otherwise the fileNumber won't be right. + @Override + public void info(String msg) { +log("INFO", msg, new Object[0], false); + } + + @Override + public void info(String msg, Object arg) { +log("INFO", msg, new Object[] {arg}, false); + } + + @Override + public void info(String msg, Object arg1, Object arg2) { +log("INFO", msg, new Object[] {arg1, arg2}, false); + } + + @Override + public void info(String msg, Object... args) { +log("INFO", msg, args, false); + } + + @Override + public void warn(String msg) { +log("WARN", msg, new Object[0], false); + } + + @Override + public void warn(String msg, Object arg) { +log("WARN", msg, new Object[] {arg}, true); + } + + @Override + public void warn(String msg, Object arg1, Object arg2) { +log("WARN", msg, new Object[] {arg1, arg2}, true); + } + + @Override + public void warn(String msg, Object... args) { +log("WARN", msg, args, true); + } + + @Override + public void warn(String msg, Throwable t) { +log("WARN", msg, new Object[] {t}, true); + } + + @Override + public void error(String msg) { +log("ERROR", msg, new Object[0], false); + } + + @Override + public void error(String msg, Object arg) { +log("ERROR", msg, new Object[] {arg}, true); + } + + @Override + public void error(String msg, Object arg1, Object arg2) { +log("ERROR", msg, new Object[] {arg1, arg2}, true); + } + + @Override + public void error(String msg, Object... args) { +log("ERROR", msg, args, true); + } + + @Override + public void error(String msg, Throwable t) { +log("ERROR", msg, new Object[] {t}, true); + } + + private void log(String level, String msg, Object[] args, boolean mayPrintTrace) { Review Comment: Could we replace `String level` with a `Level enum`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org
Re: [PR] feat(java): implement fury logger and remove slf4j library [incubator-fury]
LiangliangSui commented on PR #1485: URL: https://github.com/apache/incubator-fury/pull/1485#issuecomment-2045222355 Don't we need to control the log level of printing? I see that all current levels of logs can be printed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org
Re: [PR] feat(java): implement fury logger and remove slf4j library [incubator-fury]
chaokunyang merged PR #1485: URL: https://github.com/apache/incubator-fury/pull/1485 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org
Re: [I] 【bug】Caused by: org.graalvm.compiler.debug.GraalError: com.oracle.graal.pointsto.constraints.UnsupportedFeatureException: An object of type 'ch.qos.logback.core.status.InfoStatus' was found in
chaokunyang closed issue #1404: 【bug】Caused by: org.graalvm.compiler.debug.GraalError: com.oracle.graal.pointsto.constraints.UnsupportedFeatureException: An object of type 'ch.qos.logback.core.status.InfoStatus' was found in the image heap URL: https://github.com/apache/incubator-fury/issues/1404 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org
(incubator-fury) branch main updated: feat(java): implement fury logger and remove slf4j library (#1485)
This is an automated email from the ASF dual-hosted git repository. chaokunyang pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/incubator-fury.git The following commit(s) were added to refs/heads/main by this push: new af97e62b feat(java): implement fury logger and remove slf4j library (#1485) af97e62b is described below commit af97e62b013a3418b4fc70c019486adbab5da2c9 Author: Shawn Yang AuthorDate: Tue Apr 9 21:34:17 2024 +0800 feat(java): implement fury logger and remove slf4j library (#1485) This PR implemented fury logger and remove slf4j library Closes #1484 Closes #1404 --- .../org/apache/fury/benchmark/CollectionSuite.java | 4 +- .../java/org/apache/fury/benchmark/RowSuite.java | 4 +- .../fury/benchmark/UserTypeSerializeSuite.java | 6 +- .../org/apache/fury/benchmark/state/FuryState.java | 6 +- .../apache/fury/benchmark/state/HessionState.java | 4 +- .../apache/fury/benchmark/state/JsonbState.java| 6 +- .../org/apache/fury/benchmark/state/KryoState.java | 4 +- .../apache/fury/benchmark/state/ObjectType.java| 2 +- java/fury-core/pom.xml | 1 + .../src/main/java/org/apache/fury/Fury.java| 4 +- .../org/apache/fury/builder/AccessorHelper.java| 4 +- .../org/apache/fury/codegen/CodeGenerator.java | 7 +- .../java/org/apache/fury/codegen/CompileUnit.java | 4 +- .../java/org/apache/fury/codegen/JaninoUtils.java | 32 +++-- .../java/org/apache/fury/config/FuryBuilder.java | 4 +- .../org/apache/fury/io/FuryReadableChannel.java| 16 ++- .../java/org/apache/fury/logging/FuryLogger.java | 133 + .../main/java/org/apache/fury/logging/Logger.java | 58 + .../org/apache/fury/logging/LoggerFactory.java | 61 ++ .../java/org/apache/fury/logging/NilLogger.java| 65 ++ .../java/org/apache/fury/logging/Slf4jLogger.java | 124 +++ .../apache/fury/pool/ClassLoaderFuryPooled.java| 4 +- .../apache/fury/pool/FuryPooledObjectFactory.java | 4 +- .../org/apache/fury/resolver/AllowListChecker.java | 4 +- .../org/apache/fury/resolver/ClassResolver.java| 7 +- .../org/apache/fury/serializer/JavaSerializer.java | 4 +- .../fury/serializer/ObjectStreamSerializer.java| 4 +- .../fury/serializer/ReplaceResolveSerializer.java | 4 +- .../apache/fury/serializer/StructSerializer.java | 4 +- .../collection/SynchronizedSerializers.java| 4 +- .../collection/UnmodifiableSerializers.java| 4 +- .../fury/serializer/shim/ShimDispatcher.java | 4 +- .../main/java/org/apache/fury/type/ClassDef.java | 4 +- .../org/apache/fury/util/ClassLoaderUtils.java | 4 +- .../java/org/apache/fury/util/LoggerFactory.java | 108 - .../fury-core/native-image.properties | 5 +- .../java/org/apache/fury/CrossLanguageTest.java| 4 +- .../test/java/org/apache/fury/FuryInitPerf.java| 4 +- .../org/apache/fury/builder/JITContextTest.java| 4 +- .../fury/collection/MultiKeyWeakMapTest.java | 4 +- .../org/apache/fury/logging/Slf4jLoggerTest.java} | 37 +++--- .../apache/fury/resolver/ClassResolverTest.java| 4 +- .../java/org/apache/fury/util/PlatformTest.java| 15 +-- .../fury/format/encoder/ArrayEncoderBuilder.java | 6 +- .../org/apache/fury/format/encoder/Encoders.java | 14 +-- .../fury/format/encoder/MapEncoderBuilder.java | 6 +- .../fury/format/encoder/RowEncoderBuilder.java | 6 +- .../org/apache/fury/format/CrossLanguageTest.java | 4 +- .../fury/format/row/binary/BinaryArrayTest.java| 12 +- .../fury/format/row/binary/BinaryRowTest.java | 12 +- .../fury/benchmark/DeserializationBenchmark.java | 120 +-- .../fury/benchmark/SerializationBenchmark.java | 126 +-- 52 files changed, 695 insertions(+), 400 deletions(-) diff --git a/java/benchmark/src/main/java/org/apache/fury/benchmark/CollectionSuite.java b/java/benchmark/src/main/java/org/apache/fury/benchmark/CollectionSuite.java index 597e5a83..16a469f3 100644 --- a/java/benchmark/src/main/java/org/apache/fury/benchmark/CollectionSuite.java +++ b/java/benchmark/src/main/java/org/apache/fury/benchmark/CollectionSuite.java @@ -22,10 +22,10 @@ package org.apache.fury.benchmark; import java.util.ArrayList; import java.util.List; import org.apache.fury.Fury; +import org.apache.fury.logging.Logger; +import org.apache.fury.logging.LoggerFactory; import org.openjdk.jmh.Main; import org.openjdk.jmh.annotations.Benchmark; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** Test suite for collection. */ public class CollectionSuite { diff --git a/java/benchmark/src/main/java/org/apache/fury/benchmark/RowSuite.java b/java/benchmark/src/main/java/org/apache/fury/benchmark/RowSuite.java index 576383b6..77da175f 100644 ---
Re: [I] [Java] Remove SLF4J logger library [incubator-fury]
chaokunyang closed issue #1484: [Java] Remove SLF4J logger library URL: https://github.com/apache/incubator-fury/issues/1484 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org
Re: [PR] chore: add pull request template [incubator-fury]
theweipeng merged PR #1477: URL: https://github.com/apache/incubator-fury/pull/1477 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org
(incubator-fury) branch main updated: chore: add pull request template (#1477)
This is an automated email from the ASF dual-hosted git repository. wangweipeng pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/incubator-fury.git The following commit(s) were added to refs/heads/main by this push: new 6ee0a2b9 chore: add pull request template (#1477) 6ee0a2b9 is described below commit 6ee0a2b9aa74859d6d44aeadf4bfcf15a888239b Author: LiangliangSui <116876207+liangliang...@users.noreply.github.com> AuthorDate: Tue Apr 9 21:33:24 2024 +0800 chore: add pull request template (#1477) --- .github/pull_request_template.md | 43 1 file changed, 43 insertions(+) diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md new file mode 100644 index ..6e5d4d60 --- /dev/null +++ b/.github/pull_request_template.md @@ -0,0 +1,43 @@ + + +## What does this PR do? + + + + +## Related issues + + + + +## Does this PR introduce any user-facing change? + + + +- [ ] Does this PR introduce any public API change? +- [ ] Does this PR introduce any binary protocol compatibility change? + + +## Benchmark + + - To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org
Re: [PR] feat(java): channel stream reader [incubator-fury]
chaokunyang commented on code in PR #1483: URL: https://github.com/apache/incubator-fury/pull/1483#discussion_r1557633949 ## java/fury-core/src/main/java/org/apache/fury/io/FuryReadableChannel.java: ## @@ -20,30 +20,130 @@ package org.apache.fury.io; import java.io.IOException; -import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.ReadableByteChannel; +import javax.annotation.concurrent.NotThreadSafe; +import org.apache.fury.exception.DeserializationException; import org.apache.fury.memory.MemoryBuffer; +import org.apache.fury.util.Platform; +import org.apache.fury.util.Preconditions; -// TODO support zero-copy channel reading. -public class FuryReadableChannel extends AbstractStreamReader implements ReadableByteChannel { +@NotThreadSafe +public class FuryReadableChannel implements FuryStreamReader, ReadableByteChannel { private final ReadableByteChannel channel; - private final ByteBuffer byteBuffer; - private final MemoryBuffer buffer; + private final MemoryBuffer memoryBuffer; + private ByteBuffer byteBuffer; public FuryReadableChannel(ReadableByteChannel channel) { -this(channel, ByteBuffer.allocate(4096)); +this(channel, ByteBuffer.allocateDirect(4096)); } - private FuryReadableChannel(ReadableByteChannel channel, ByteBuffer directBuffer) { + public FuryReadableChannel(ReadableByteChannel channel, ByteBuffer directBuffer) { +Preconditions.checkArgument( +directBuffer.isDirect(), "FuryReadableChannel support only direct ByteBuffer."); this.channel = channel; this.byteBuffer = directBuffer; -this.buffer = MemoryBuffer.fromByteBuffer(directBuffer); + +long offHeapAddress = Platform.getAddress(directBuffer) + directBuffer.position(); +this.memoryBuffer = new MemoryBuffer(offHeapAddress, 0, directBuffer, this); + } + + @Override + public int fillBuffer(int minFillSize) { +try { + ByteBuffer byteBuf = byteBuffer; + MemoryBuffer memoryBuf = memoryBuffer; + int position = byteBuf.position(); + int newLimit = position + minFillSize; + if (newLimit > byteBuf.capacity()) { +int newSize = +newLimit < BUFFER_GROW_STEP_THRESHOLD ? newLimit << 2 : (int) (newLimit * 1.5); +ByteBuffer newByteBuf = ByteBuffer.allocateDirect(newSize); +byteBuf.position(0); +newByteBuf.put(byteBuf); +byteBuf = byteBuffer = newByteBuf; +memoryBuf.initDirectBuffer(Platform.getAddress(byteBuf), position, byteBuf); + } + byteBuf.limit(newLimit); + int readCount = channel.read(byteBuf); + memoryBuf.increaseSize(readCount); + return readCount; +} catch (IOException e) { + throw new DeserializationException("Failed to read the provided byte channel", e); +} } @Override public int read(ByteBuffer dst) throws IOException { -throw new UnsupportedEncodingException(); +int dstRemaining = dst.remaining(); +if (dstRemaining <= 0) { + return 0; +} +MemoryBuffer buf = memoryBuffer; +int remaining = buf.remaining(); +if (remaining <= 0) { + return -1; +} +if (remaining >= dstRemaining) { + byte[] bytes = buf.readBytes(dstRemaining); + dst.put(bytes); + return dstRemaining; +} else { + int filledSize = fillBuffer(dstRemaining - remaining); + int length = remaining + filledSize; + byte[] bytes = buf.readBytes(length); + dst.put(bytes); + return length; +} + } + + @Override + public void readTo(byte[] dst, int dstIndex, int length) { +MemoryBuffer buf = memoryBuffer; +int remaining = buf.remaining(); +if (remaining >= length) { + buf.readBytes(dst, dstIndex, length); +} else { + buf.readBytes(dst, dstIndex, remaining); + try { +ByteBuffer buffer = ByteBuffer.wrap(dst, dstIndex + remaining, length - remaining); +channel.read(buffer); + } catch (IOException e) { +throw new DeserializationException("Failed to read the provided byte channel", e); + } +} + } + + @Override + public void readToUnsafe(Object target, long targetPointer, int numBytes) { +MemoryBuffer buf = memoryBuffer; +int remaining = buf.remaining(); +if (remaining < numBytes) { + fillBuffer(numBytes - remaining); +} +long address = buf.getUnsafeReaderAddress(); +Platform.copyMemory(null, address, target, targetPointer, numBytes); +buf.increaseReaderIndex(numBytes); + } + + @Override + public void readToByteBuffer(ByteBuffer dst, int length) { +MemoryBuffer buf = memoryBuffer; +int remaining = buf.remaining(); +if (remaining < length) { + remaining += fillBuffer(length - remaining); Review Comment: Actually I'm considering only provide streaming interface to customized serializer to forbidden users to use API that can go back -- This is an automated message from the Apache Git Service.
[GH] (incubator-fury): Workflow run "Fury CI" is working again!
The GitHub Actions job "Fury CI" on incubator-fury.git has succeeded. Run started by GitHub user chaokunyang (triggered by chaokunyang). Head commit for run: 68f57c7866a847c987839e6c3de0bd71d2745fab / chaokunyang fix graalvm class init Report URL: https://github.com/apache/incubator-fury/actions/runs/8616562106 With regards, GitHub Actions via GitBox - To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org
Re: [PR] feat(java): channel stream reader [incubator-fury]
chaokunyang commented on code in PR #1483: URL: https://github.com/apache/incubator-fury/pull/1483#discussion_r1557632263 ## java/fury-core/src/main/java/org/apache/fury/io/FuryReadableChannel.java: ## @@ -20,30 +20,130 @@ package org.apache.fury.io; import java.io.IOException; -import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.ReadableByteChannel; +import javax.annotation.concurrent.NotThreadSafe; +import org.apache.fury.exception.DeserializationException; import org.apache.fury.memory.MemoryBuffer; +import org.apache.fury.util.Platform; +import org.apache.fury.util.Preconditions; -// TODO support zero-copy channel reading. -public class FuryReadableChannel extends AbstractStreamReader implements ReadableByteChannel { +@NotThreadSafe +public class FuryReadableChannel implements FuryStreamReader, ReadableByteChannel { private final ReadableByteChannel channel; - private final ByteBuffer byteBuffer; - private final MemoryBuffer buffer; + private final MemoryBuffer memoryBuffer; + private ByteBuffer byteBuffer; public FuryReadableChannel(ReadableByteChannel channel) { -this(channel, ByteBuffer.allocate(4096)); +this(channel, ByteBuffer.allocateDirect(4096)); } - private FuryReadableChannel(ReadableByteChannel channel, ByteBuffer directBuffer) { + public FuryReadableChannel(ReadableByteChannel channel, ByteBuffer directBuffer) { +Preconditions.checkArgument( +directBuffer.isDirect(), "FuryReadableChannel support only direct ByteBuffer."); this.channel = channel; this.byteBuffer = directBuffer; -this.buffer = MemoryBuffer.fromByteBuffer(directBuffer); + +long offHeapAddress = Platform.getAddress(directBuffer) + directBuffer.position(); +this.memoryBuffer = new MemoryBuffer(offHeapAddress, 0, directBuffer, this); + } + + @Override + public int fillBuffer(int minFillSize) { +try { + ByteBuffer byteBuf = byteBuffer; + MemoryBuffer memoryBuf = memoryBuffer; + int position = byteBuf.position(); + int newLimit = position + minFillSize; + if (newLimit > byteBuf.capacity()) { +int newSize = +newLimit < BUFFER_GROW_STEP_THRESHOLD ? newLimit << 2 : (int) (newLimit * 1.5); +ByteBuffer newByteBuf = ByteBuffer.allocateDirect(newSize); +byteBuf.position(0); +newByteBuf.put(byteBuf); +byteBuf = byteBuffer = newByteBuf; +memoryBuf.initDirectBuffer(Platform.getAddress(byteBuf), position, byteBuf); + } + byteBuf.limit(newLimit); + int readCount = channel.read(byteBuf); + memoryBuf.increaseSize(readCount); + return readCount; +} catch (IOException e) { + throw new DeserializationException("Failed to read the provided byte channel", e); +} } @Override public int read(ByteBuffer dst) throws IOException { -throw new UnsupportedEncodingException(); +int dstRemaining = dst.remaining(); +if (dstRemaining <= 0) { + return 0; +} +MemoryBuffer buf = memoryBuffer; +int remaining = buf.remaining(); +if (remaining <= 0) { + return -1; +} +if (remaining >= dstRemaining) { + byte[] bytes = buf.readBytes(dstRemaining); + dst.put(bytes); + return dstRemaining; +} else { + int filledSize = fillBuffer(dstRemaining - remaining); + int length = remaining + filledSize; + byte[] bytes = buf.readBytes(length); + dst.put(bytes); + return length; +} + } + + @Override + public void readTo(byte[] dst, int dstIndex, int length) { +MemoryBuffer buf = memoryBuffer; +int remaining = buf.remaining(); +if (remaining >= length) { + buf.readBytes(dst, dstIndex, length); +} else { + buf.readBytes(dst, dstIndex, remaining); + try { +ByteBuffer buffer = ByteBuffer.wrap(dst, dstIndex + remaining, length - remaining); +channel.read(buffer); + } catch (IOException e) { +throw new DeserializationException("Failed to read the provided byte channel", e); + } +} + } + + @Override + public void readToUnsafe(Object target, long targetPointer, int numBytes) { +MemoryBuffer buf = memoryBuffer; +int remaining = buf.remaining(); +if (remaining < numBytes) { + fillBuffer(numBytes - remaining); +} +long address = buf.getUnsafeReaderAddress(); +Platform.copyMemory(null, address, target, targetPointer, numBytes); +buf.increaseReaderIndex(numBytes); + } + + @Override + public void readToByteBuffer(ByteBuffer dst, int length) { +MemoryBuffer buf = memoryBuffer; +int remaining = buf.remaining(); +if (remaining < length) { + remaining += fillBuffer(length - remaining); Review Comment: For reading, there is no need for going back. For writing , we may need to go back to update some data. -- This is an automated message from the Apache Git Service. To respond to the message,
Re: [PR] feat(java): channel stream reader [incubator-fury]
chaokunyang commented on code in PR #1483: URL: https://github.com/apache/incubator-fury/pull/1483#discussion_r1557626053 ## java/fury-core/src/main/java/org/apache/fury/io/FuryReadableChannel.java: ## @@ -20,30 +20,114 @@ package org.apache.fury.io; import java.io.IOException; -import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.ReadableByteChannel; +import javax.annotation.concurrent.NotThreadSafe; +import org.apache.fury.exception.DeserializationException; import org.apache.fury.memory.MemoryBuffer; +import org.apache.fury.util.Platform; +import org.apache.fury.util.Preconditions; -// TODO support zero-copy channel reading. -public class FuryReadableChannel extends AbstractStreamReader implements ReadableByteChannel { +@NotThreadSafe +public class FuryReadableChannel implements FuryStreamReader, ReadableByteChannel { private final ReadableByteChannel channel; - private final ByteBuffer byteBuffer; - private final MemoryBuffer buffer; + private final MemoryBuffer memoryBuffer; + private ByteBuffer byteBuffer; public FuryReadableChannel(ReadableByteChannel channel) { -this(channel, ByteBuffer.allocate(4096)); +this(channel, ByteBuffer.allocateDirect(4096)); } - private FuryReadableChannel(ReadableByteChannel channel, ByteBuffer directBuffer) { + public FuryReadableChannel(ReadableByteChannel channel, ByteBuffer directBuffer) { +Preconditions.checkArgument( +directBuffer.isDirect(), "FuryReadableChannel support only direct ByteBuffer."); this.channel = channel; this.byteBuffer = directBuffer; -this.buffer = MemoryBuffer.fromByteBuffer(directBuffer); +this.memoryBuffer = MemoryBuffer.fromDirectByteBuffer(directBuffer, 0, this); + } + + @Override + public int fillBuffer(int minFillSize) { +try { + ByteBuffer byteBuf = byteBuffer; + MemoryBuffer memoryBuf = memoryBuffer; + int position = byteBuf.position(); + int newLimit = position + minFillSize; + if (newLimit > byteBuf.capacity()) { +int newSize = +newLimit < BUFFER_GROW_STEP_THRESHOLD ? newLimit << 2 : (int) (newLimit * 1.5); +ByteBuffer newByteBuf = ByteBuffer.allocateDirect(newSize); +byteBuf.position(0); +newByteBuf.put(byteBuf); +byteBuf = byteBuffer = newByteBuf; +memoryBuf.initDirectBuffer(Platform.getAddress(byteBuf), position, byteBuf); + } + byteBuf.limit(newLimit); + int readCount = channel.read(byteBuf); + memoryBuf.increaseSize(readCount); + return readCount; +} catch (IOException e) { + throw new DeserializationException("Failed to read the provided byte channel", e); +} } @Override public int read(ByteBuffer dst) throws IOException { -throw new UnsupportedEncodingException(); +return readToByteBuffer0(dst, dst.remaining()); + } + + @Override + public void readTo(byte[] dst, int dstIndex, int length) { +MemoryBuffer buf = memoryBuffer; +int remaining = buf.remaining(); +if (remaining >= length) { + buf.readBytes(dst, dstIndex, length); +} else { + buf.readBytes(dst, dstIndex, remaining); + try { +ByteBuffer buffer = ByteBuffer.wrap(dst, dstIndex + remaining, length - remaining); +channel.read(buffer); + } catch (IOException e) { +throw new DeserializationException("Failed to read the provided byte channel", e); + } +} + } + + @Override + public void readToUnsafe(Object target, long targetPointer, int numBytes) { +MemoryBuffer buf = memoryBuffer; +int remaining = buf.remaining(); +if (remaining < numBytes) { + fillBuffer(numBytes - remaining); +} +long address = buf.getUnsafeReaderAddress(); +Platform.copyMemory(null, address, target, targetPointer, numBytes); +buf.increaseReaderIndex(numBytes); + } + + @Override + public void readToByteBuffer(ByteBuffer dst, int length) { +readToByteBuffer0(dst, length); + } + + @Override + public int readToByteBuffer(ByteBuffer dst) { +MemoryBuffer buf = memoryBuffer; +int remaining = buf.remaining(); +if (remaining > 0) { + buf.read(dst, remaining); +} +return remaining; Review Comment: ```suggestion return buf.read(dst, remaining) + channel.read(dst); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org
Re: [PR] feat(java): channel stream reader [incubator-fury]
chaokunyang commented on code in PR #1483: URL: https://github.com/apache/incubator-fury/pull/1483#discussion_r1557624666 ## java/fury-core/src/main/java/org/apache/fury/io/FuryReadableChannel.java: ## @@ -128,21 +128,21 @@ public void readToUnsafe(Object target, long targetPointer, int numBytes) { @Override public void readToByteBuffer(ByteBuffer dst, int length) { -readToByteBuffer0(dst, length); +MemoryBuffer buf = memoryBuffer; +int remaining = buf.remaining(); +if (remaining < length) { + remaining += fillBuffer(length - remaining); +} +buf.read(dst, remaining); } @Override public int readToByteBuffer(ByteBuffer dst) { -return readToByteBuffer0(dst, dst.remaining()); - } - - private int readToByteBuffer0(ByteBuffer dst, int length) { MemoryBuffer buf = memoryBuffer; int remaining = buf.remaining(); -if (remaining < length) { - remaining += fillBuffer(length - remaining); +if (remaining > 0) { + buf.read(dst, remaining); Review Comment: > Did you mean `dst.remaining()`? `Channel` doesn't have an `available` method, we can just invoke `Channle.read` instead -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org
Re: [PR] feat(java): channel stream reader [incubator-fury]
chaokunyang commented on code in PR #1483: URL: https://github.com/apache/incubator-fury/pull/1483#discussion_r1557623575 ## java/fury-core/src/main/java/org/apache/fury/io/FuryReadableChannel.java: ## @@ -128,21 +128,21 @@ public void readToUnsafe(Object target, long targetPointer, int numBytes) { @Override public void readToByteBuffer(ByteBuffer dst, int length) { -readToByteBuffer0(dst, length); +MemoryBuffer buf = memoryBuffer; +int remaining = buf.remaining(); +if (remaining < length) { + remaining += fillBuffer(length - remaining); +} +buf.read(dst, remaining); } @Override public int readToByteBuffer(ByteBuffer dst) { -return readToByteBuffer0(dst, dst.remaining()); - } - - private int readToByteBuffer0(ByteBuffer dst, int length) { MemoryBuffer buf = memoryBuffer; int remaining = buf.remaining(); -if (remaining < length) { - remaining += fillBuffer(length - remaining); +if (remaining > 0) { + buf.read(dst, remaining); Review Comment: If there are no remaining in `memoryBuffer`, and `channel` has extra data, the invocation will stop read anything here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org
[GH] (incubator-fury): Workflow run "Fury CI" failed!
The GitHub Actions job "Fury CI" on incubator-fury.git has failed. Run started by GitHub user chaokunyang (triggered by chaokunyang). Head commit for run: 800196678fc6f880730bb37e6b94db4d26ed835b / chaokunyang add license header Report URL: https://github.com/apache/incubator-fury/actions/runs/8616438668 With regards, GitHub Actions via GitBox - To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org
[GH] (incubator-fury): Workflow run "Fury CI" failed!
The GitHub Actions job "Fury CI" on incubator-fury.git has failed. Run started by GitHub user chaokunyang (triggered by chaokunyang). Head commit for run: 412aa111c1390f105f097b1806a66772138f1187 / chaokunyang fix check log level Report URL: https://github.com/apache/incubator-fury/actions/runs/8616415964 With regards, GitHub Actions via GitBox - To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org
[PR] feat(java): implement fury logger and remove slf4j library [incubator-fury]
chaokunyang opened a new pull request, #1485: URL: https://github.com/apache/incubator-fury/pull/1485 This PR implemented fury logger and remove slf4j library Closes #1484 Closes #1404 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org
Re: [PR] feat(java): channel stream reader [incubator-fury]
Munoon commented on code in PR #1483: URL: https://github.com/apache/incubator-fury/pull/1483#discussion_r1557546277 ## java/fury-core/src/main/java/org/apache/fury/io/FuryReadableChannel.java: ## @@ -20,30 +20,130 @@ package org.apache.fury.io; import java.io.IOException; -import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.ReadableByteChannel; +import javax.annotation.concurrent.NotThreadSafe; +import org.apache.fury.exception.DeserializationException; import org.apache.fury.memory.MemoryBuffer; +import org.apache.fury.util.Platform; +import org.apache.fury.util.Preconditions; -// TODO support zero-copy channel reading. -public class FuryReadableChannel extends AbstractStreamReader implements ReadableByteChannel { +@NotThreadSafe +public class FuryReadableChannel implements FuryStreamReader, ReadableByteChannel { private final ReadableByteChannel channel; - private final ByteBuffer byteBuffer; - private final MemoryBuffer buffer; + private final MemoryBuffer memoryBuffer; + private ByteBuffer byteBuffer; public FuryReadableChannel(ReadableByteChannel channel) { -this(channel, ByteBuffer.allocate(4096)); +this(channel, ByteBuffer.allocateDirect(4096)); } - private FuryReadableChannel(ReadableByteChannel channel, ByteBuffer directBuffer) { + public FuryReadableChannel(ReadableByteChannel channel, ByteBuffer directBuffer) { +Preconditions.checkArgument( +directBuffer.isDirect(), "FuryReadableChannel support only direct ByteBuffer."); this.channel = channel; this.byteBuffer = directBuffer; -this.buffer = MemoryBuffer.fromByteBuffer(directBuffer); + +long offHeapAddress = Platform.getAddress(directBuffer) + directBuffer.position(); +this.memoryBuffer = new MemoryBuffer(offHeapAddress, 0, directBuffer, this); + } + + @Override + public int fillBuffer(int minFillSize) { +try { + ByteBuffer byteBuf = byteBuffer; + MemoryBuffer memoryBuf = memoryBuffer; + int position = byteBuf.position(); + int newLimit = position + minFillSize; + if (newLimit > byteBuf.capacity()) { +int newSize = +newLimit < BUFFER_GROW_STEP_THRESHOLD ? newLimit << 2 : (int) (newLimit * 1.5); +ByteBuffer newByteBuf = ByteBuffer.allocateDirect(newSize); +byteBuf.position(0); +newByteBuf.put(byteBuf); +byteBuf = byteBuffer = newByteBuf; +memoryBuf.initDirectBuffer(Platform.getAddress(byteBuf), position, byteBuf); + } + byteBuf.limit(newLimit); + int readCount = channel.read(byteBuf); + memoryBuf.increaseSize(readCount); + return readCount; +} catch (IOException e) { + throw new DeserializationException("Failed to read the provided byte channel", e); +} } @Override public int read(ByteBuffer dst) throws IOException { -throw new UnsupportedEncodingException(); +int dstRemaining = dst.remaining(); +if (dstRemaining <= 0) { + return 0; +} +MemoryBuffer buf = memoryBuffer; +int remaining = buf.remaining(); +if (remaining <= 0) { + return -1; +} +if (remaining >= dstRemaining) { + byte[] bytes = buf.readBytes(dstRemaining); + dst.put(bytes); + return dstRemaining; +} else { + int filledSize = fillBuffer(dstRemaining - remaining); + int length = remaining + filledSize; + byte[] bytes = buf.readBytes(length); + dst.put(bytes); + return length; Review Comment: Hmm, looks like we can also rewrite it using `readToByteBuffer(ByteBuffer, int)` method. I'll use this method in `read(ByteBuffer)` method at least until we decide the final implementation of `readToByteBuffer(ByteBuffer, int)`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org
Re: [PR] feat(java): channel stream reader [incubator-fury]
Munoon commented on code in PR #1483: URL: https://github.com/apache/incubator-fury/pull/1483#discussion_r1557534437 ## java/fury-core/src/main/java/org/apache/fury/io/FuryReadableChannel.java: ## @@ -20,30 +20,130 @@ package org.apache.fury.io; import java.io.IOException; -import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.ReadableByteChannel; +import javax.annotation.concurrent.NotThreadSafe; +import org.apache.fury.exception.DeserializationException; import org.apache.fury.memory.MemoryBuffer; +import org.apache.fury.util.Platform; +import org.apache.fury.util.Preconditions; -// TODO support zero-copy channel reading. -public class FuryReadableChannel extends AbstractStreamReader implements ReadableByteChannel { +@NotThreadSafe +public class FuryReadableChannel implements FuryStreamReader, ReadableByteChannel { private final ReadableByteChannel channel; - private final ByteBuffer byteBuffer; - private final MemoryBuffer buffer; + private final MemoryBuffer memoryBuffer; + private ByteBuffer byteBuffer; public FuryReadableChannel(ReadableByteChannel channel) { -this(channel, ByteBuffer.allocate(4096)); +this(channel, ByteBuffer.allocateDirect(4096)); } - private FuryReadableChannel(ReadableByteChannel channel, ByteBuffer directBuffer) { + public FuryReadableChannel(ReadableByteChannel channel, ByteBuffer directBuffer) { +Preconditions.checkArgument( +directBuffer.isDirect(), "FuryReadableChannel support only direct ByteBuffer."); this.channel = channel; this.byteBuffer = directBuffer; -this.buffer = MemoryBuffer.fromByteBuffer(directBuffer); + +long offHeapAddress = Platform.getAddress(directBuffer) + directBuffer.position(); +this.memoryBuffer = new MemoryBuffer(offHeapAddress, 0, directBuffer, this); + } + + @Override + public int fillBuffer(int minFillSize) { +try { + ByteBuffer byteBuf = byteBuffer; + MemoryBuffer memoryBuf = memoryBuffer; + int position = byteBuf.position(); + int newLimit = position + minFillSize; + if (newLimit > byteBuf.capacity()) { +int newSize = +newLimit < BUFFER_GROW_STEP_THRESHOLD ? newLimit << 2 : (int) (newLimit * 1.5); +ByteBuffer newByteBuf = ByteBuffer.allocateDirect(newSize); +byteBuf.position(0); +newByteBuf.put(byteBuf); +byteBuf = byteBuffer = newByteBuf; +memoryBuf.initDirectBuffer(Platform.getAddress(byteBuf), position, byteBuf); + } + byteBuf.limit(newLimit); + int readCount = channel.read(byteBuf); + memoryBuf.increaseSize(readCount); + return readCount; +} catch (IOException e) { + throw new DeserializationException("Failed to read the provided byte channel", e); +} } @Override public int read(ByteBuffer dst) throws IOException { -throw new UnsupportedEncodingException(); +int dstRemaining = dst.remaining(); +if (dstRemaining <= 0) { + return 0; +} +MemoryBuffer buf = memoryBuffer; +int remaining = buf.remaining(); +if (remaining <= 0) { + return -1; +} +if (remaining >= dstRemaining) { + byte[] bytes = buf.readBytes(dstRemaining); + dst.put(bytes); + return dstRemaining; +} else { + int filledSize = fillBuffer(dstRemaining - remaining); + int length = remaining + filledSize; + byte[] bytes = buf.readBytes(length); + dst.put(bytes); + return length; +} + } + + @Override + public void readTo(byte[] dst, int dstIndex, int length) { +MemoryBuffer buf = memoryBuffer; +int remaining = buf.remaining(); +if (remaining >= length) { + buf.readBytes(dst, dstIndex, length); +} else { + buf.readBytes(dst, dstIndex, remaining); + try { +ByteBuffer buffer = ByteBuffer.wrap(dst, dstIndex + remaining, length - remaining); +channel.read(buffer); + } catch (IOException e) { +throw new DeserializationException("Failed to read the provided byte channel", e); + } +} + } + + @Override + public void readToUnsafe(Object target, long targetPointer, int numBytes) { +MemoryBuffer buf = memoryBuffer; +int remaining = buf.remaining(); +if (remaining < numBytes) { + fillBuffer(numBytes - remaining); +} +long address = buf.getUnsafeReaderAddress(); +Platform.copyMemory(null, address, target, targetPointer, numBytes); +buf.increaseReaderIndex(numBytes); + } + + @Override + public void readToByteBuffer(ByteBuffer dst, int length) { +MemoryBuffer buf = memoryBuffer; +int remaining = buf.remaining(); +if (remaining < length) { + remaining += fillBuffer(length - remaining); Review Comment: I thought we need it, to have the ability to *go back*. If we don't, to avoid using some transfer buffer we'll be to set a limit to the destination buffer. Is this acceptable? -- This is an
Re: [PR] feat(java): channel stream reader [incubator-fury]
Munoon commented on code in PR #1483: URL: https://github.com/apache/incubator-fury/pull/1483#discussion_r1557524745 ## java/fury-core/src/main/java/org/apache/fury/io/FuryReadableChannel.java: ## @@ -128,21 +128,21 @@ public void readToUnsafe(Object target, long targetPointer, int numBytes) { @Override public void readToByteBuffer(ByteBuffer dst, int length) { -readToByteBuffer0(dst, length); +MemoryBuffer buf = memoryBuffer; +int remaining = buf.remaining(); +if (remaining < length) { + remaining += fillBuffer(length - remaining); +} +buf.read(dst, remaining); } @Override public int readToByteBuffer(ByteBuffer dst) { -return readToByteBuffer0(dst, dst.remaining()); - } - - private int readToByteBuffer0(ByteBuffer dst, int length) { MemoryBuffer buf = memoryBuffer; int remaining = buf.remaining(); -if (remaining < length) { - remaining += fillBuffer(length - remaining); +if (remaining > 0) { + buf.read(dst, remaining); Review Comment: Did you mean `dst.remaining()`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org
Re: [PR] chore: rename unsigned varint/varlong method name [incubator-fury]
chaokunyang commented on PR #1480: URL: https://github.com/apache/incubator-fury/pull/1480#issuecomment-2044866763 > Actually if it's C++/rust I will recommend to use template/generics instead of put type into function name. > > But not sure if it's a good way in Java. Java generics it erased, it's not feasible to invoke it using generics to dispatch -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org
[I] [Java] Remove SLF4J logger library [incubator-fury]
chaokunyang opened a new issue, #1484: URL: https://github.com/apache/incubator-fury/issues/1484 ## Is your feature request related to a problem? Please describe. SLF4J introduces some conflicts in graalvm image build, see #1404 And it's time consuming in #1481 ## Describe the solution you'd like Since the log won't be printed frequently, we may be able to replace it with a simple logger implemented by Fury. `Thread.currentThread().getStackTrace()[1].getLineNumber()` can be used to extrac line number ## Additional context -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org
Re: [I] [Java] Map serialization bug [incubator-fury]
chaokunyang commented on issue #1481: URL: https://github.com/apache/incubator-fury/issues/1481#issuecomment-2044807196 ![image](https://github.com/apache/incubator-fury/assets/12445254/0874a429-27eb-459b-9a94-a72c944aeb9e) SLFJ4 seems took too long for init. Actually, I'm thinking whether we should remove SLFJ4 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org
Re: [I] [Java] Map serialization bug [incubator-fury]
chaokunyang commented on issue #1481: URL: https://github.com/apache/incubator-fury/issues/1481#issuecomment-2044795997 > Looks like the logger initialization issue (I have some log4j configuration files in classpath). However, disabling logging still didn't help, as it is still initialize in `ShimDispatcher`. I guess, it should be refactored with Furies `LoggerFactory`. WDYT? Yes, all logger should use FURY LoggerFactory, would you like to submit a PR to fix this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org
Re: [PR] feat(java): channel stream reader [incubator-fury]
chaokunyang commented on code in PR #1483: URL: https://github.com/apache/incubator-fury/pull/1483#discussion_r1557455886 ## java/fury-core/src/main/java/org/apache/fury/memory/MemoryBuffer.java: ## @@ -156,8 +156,17 @@ private MemoryBuffer(long offHeapAddress, int size, ByteBuffer offHeapBuffer) { * the memory being released. * @param streamReader a reader for reading from a stream. */ - private MemoryBuffer( + public MemoryBuffer( Review Comment: We'd better keep it private, the MemoryBuffer should only be created by factory method -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org
Re: [PR] feat(java): channel stream reader [incubator-fury]
chaokunyang commented on code in PR #1483: URL: https://github.com/apache/incubator-fury/pull/1483#discussion_r1557454244 ## java/fury-core/src/main/java/org/apache/fury/io/FuryReadableChannel.java: ## @@ -20,30 +20,130 @@ package org.apache.fury.io; import java.io.IOException; -import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.ReadableByteChannel; +import javax.annotation.concurrent.NotThreadSafe; +import org.apache.fury.exception.DeserializationException; import org.apache.fury.memory.MemoryBuffer; +import org.apache.fury.util.Platform; +import org.apache.fury.util.Preconditions; -// TODO support zero-copy channel reading. -public class FuryReadableChannel extends AbstractStreamReader implements ReadableByteChannel { +@NotThreadSafe +public class FuryReadableChannel implements FuryStreamReader, ReadableByteChannel { private final ReadableByteChannel channel; - private final ByteBuffer byteBuffer; - private final MemoryBuffer buffer; + private final MemoryBuffer memoryBuffer; + private ByteBuffer byteBuffer; public FuryReadableChannel(ReadableByteChannel channel) { -this(channel, ByteBuffer.allocate(4096)); +this(channel, ByteBuffer.allocateDirect(4096)); } - private FuryReadableChannel(ReadableByteChannel channel, ByteBuffer directBuffer) { + public FuryReadableChannel(ReadableByteChannel channel, ByteBuffer directBuffer) { +Preconditions.checkArgument( +directBuffer.isDirect(), "FuryReadableChannel support only direct ByteBuffer."); this.channel = channel; this.byteBuffer = directBuffer; -this.buffer = MemoryBuffer.fromByteBuffer(directBuffer); + +long offHeapAddress = Platform.getAddress(directBuffer) + directBuffer.position(); +this.memoryBuffer = new MemoryBuffer(offHeapAddress, 0, directBuffer, this); + } + + @Override + public int fillBuffer(int minFillSize) { +try { + ByteBuffer byteBuf = byteBuffer; + MemoryBuffer memoryBuf = memoryBuffer; + int position = byteBuf.position(); + int newLimit = position + minFillSize; + if (newLimit > byteBuf.capacity()) { +int newSize = +newLimit < BUFFER_GROW_STEP_THRESHOLD ? newLimit << 2 : (int) (newLimit * 1.5); +ByteBuffer newByteBuf = ByteBuffer.allocateDirect(newSize); +byteBuf.position(0); +newByteBuf.put(byteBuf); +byteBuf = byteBuffer = newByteBuf; +memoryBuf.initDirectBuffer(Platform.getAddress(byteBuf), position, byteBuf); + } + byteBuf.limit(newLimit); + int readCount = channel.read(byteBuf); + memoryBuf.increaseSize(readCount); + return readCount; +} catch (IOException e) { + throw new DeserializationException("Failed to read the provided byte channel", e); +} } @Override public int read(ByteBuffer dst) throws IOException { -throw new UnsupportedEncodingException(); +int dstRemaining = dst.remaining(); +if (dstRemaining <= 0) { + return 0; +} +MemoryBuffer buf = memoryBuffer; +int remaining = buf.remaining(); +if (remaining <= 0) { + return -1; +} +if (remaining >= dstRemaining) { + byte[] bytes = buf.readBytes(dstRemaining); + dst.put(bytes); + return dstRemaining; +} else { + int filledSize = fillBuffer(dstRemaining - remaining); + int length = remaining + filledSize; + byte[] bytes = buf.readBytes(length); + dst.put(bytes); + return length; +} + } + + @Override + public void readTo(byte[] dst, int dstIndex, int length) { +MemoryBuffer buf = memoryBuffer; +int remaining = buf.remaining(); +if (remaining >= length) { + buf.readBytes(dst, dstIndex, length); +} else { + buf.readBytes(dst, dstIndex, remaining); + try { +ByteBuffer buffer = ByteBuffer.wrap(dst, dstIndex + remaining, length - remaining); +channel.read(buffer); + } catch (IOException e) { +throw new DeserializationException("Failed to read the provided byte channel", e); + } +} + } + + @Override + public void readToUnsafe(Object target, long targetPointer, int numBytes) { +MemoryBuffer buf = memoryBuffer; +int remaining = buf.remaining(); +if (remaining < numBytes) { + fillBuffer(numBytes - remaining); +} +long address = buf.getUnsafeReaderAddress(); +Platform.copyMemory(null, address, target, targetPointer, numBytes); +buf.increaseReaderIndex(numBytes); + } + + @Override + public void readToByteBuffer(ByteBuffer dst, int length) { +MemoryBuffer buf = memoryBuffer; +int remaining = buf.remaining(); +if (remaining < length) { + remaining += fillBuffer(length - remaining); Review Comment: This introduce an extra copy, the data doesn't have to copy into `memoryBuffer` to read into `dst` -- This is an automated message from the Apache Git Service. To respond to the message,
Re: [PR] feat(java): channel stream reader [incubator-fury]
chaokunyang commented on code in PR #1483: URL: https://github.com/apache/incubator-fury/pull/1483#discussion_r1557302317 ## java/fury-core/src/main/java/org/apache/fury/io/FuryReadableChannel.java: ## @@ -128,21 +128,21 @@ public void readToUnsafe(Object target, long targetPointer, int numBytes) { @Override public void readToByteBuffer(ByteBuffer dst, int length) { -readToByteBuffer0(dst, length); +MemoryBuffer buf = memoryBuffer; +int remaining = buf.remaining(); +if (remaining < length) { + remaining += fillBuffer(length - remaining); +} +buf.read(dst, remaining); } @Override public int readToByteBuffer(ByteBuffer dst) { -return readToByteBuffer0(dst, dst.remaining()); - } - - private int readToByteBuffer0(ByteBuffer dst, int length) { MemoryBuffer buf = memoryBuffer; int remaining = buf.remaining(); -if (remaining < length) { - remaining += fillBuffer(length - remaining); +if (remaining > 0) { + buf.read(dst, remaining); Review Comment: We may need to check stream.available here, otherwise continuous invoking `readToByteBuffer` this method will always return 0 ## java/fury-core/src/main/java/org/apache/fury/io/FuryReadableChannel.java: ## @@ -20,30 +20,130 @@ package org.apache.fury.io; import java.io.IOException; -import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.ReadableByteChannel; +import javax.annotation.concurrent.NotThreadSafe; +import org.apache.fury.exception.DeserializationException; import org.apache.fury.memory.MemoryBuffer; +import org.apache.fury.util.Platform; +import org.apache.fury.util.Preconditions; -// TODO support zero-copy channel reading. -public class FuryReadableChannel extends AbstractStreamReader implements ReadableByteChannel { +@NotThreadSafe +public class FuryReadableChannel implements FuryStreamReader, ReadableByteChannel { private final ReadableByteChannel channel; - private final ByteBuffer byteBuffer; - private final MemoryBuffer buffer; + private final MemoryBuffer memoryBuffer; + private ByteBuffer byteBuffer; public FuryReadableChannel(ReadableByteChannel channel) { -this(channel, ByteBuffer.allocate(4096)); +this(channel, ByteBuffer.allocateDirect(4096)); } - private FuryReadableChannel(ReadableByteChannel channel, ByteBuffer directBuffer) { + public FuryReadableChannel(ReadableByteChannel channel, ByteBuffer directBuffer) { +Preconditions.checkArgument( +directBuffer.isDirect(), "FuryReadableChannel support only direct ByteBuffer."); this.channel = channel; this.byteBuffer = directBuffer; -this.buffer = MemoryBuffer.fromByteBuffer(directBuffer); + +long offHeapAddress = Platform.getAddress(directBuffer) + directBuffer.position(); +this.memoryBuffer = new MemoryBuffer(offHeapAddress, 0, directBuffer, this); + } + + @Override + public int fillBuffer(int minFillSize) { +try { + ByteBuffer byteBuf = byteBuffer; + MemoryBuffer memoryBuf = memoryBuffer; + int position = byteBuf.position(); + int newLimit = position + minFillSize; + if (newLimit > byteBuf.capacity()) { +int newSize = +newLimit < BUFFER_GROW_STEP_THRESHOLD ? newLimit << 2 : (int) (newLimit * 1.5); +ByteBuffer newByteBuf = ByteBuffer.allocateDirect(newSize); +byteBuf.position(0); +newByteBuf.put(byteBuf); +byteBuf = byteBuffer = newByteBuf; +memoryBuf.initDirectBuffer(Platform.getAddress(byteBuf), position, byteBuf); + } + byteBuf.limit(newLimit); + int readCount = channel.read(byteBuf); + memoryBuf.increaseSize(readCount); + return readCount; +} catch (IOException e) { + throw new DeserializationException("Failed to read the provided byte channel", e); +} } @Override public int read(ByteBuffer dst) throws IOException { -throw new UnsupportedEncodingException(); +int dstRemaining = dst.remaining(); +if (dstRemaining <= 0) { + return 0; +} +MemoryBuffer buf = memoryBuffer; +int remaining = buf.remaining(); +if (remaining <= 0) { + return -1; +} +if (remaining >= dstRemaining) { + byte[] bytes = buf.readBytes(dstRemaining); Review Comment: This introduce an extra copy, we can use `buf.read(dst, dstRemaining)` ## java/fury-core/src/main/java/org/apache/fury/io/FuryReadableChannel.java: ## @@ -20,30 +20,130 @@ package org.apache.fury.io; import java.io.IOException; -import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.ReadableByteChannel; +import javax.annotation.concurrent.NotThreadSafe; +import org.apache.fury.exception.DeserializationException; import org.apache.fury.memory.MemoryBuffer; +import org.apache.fury.util.Platform; +import org.apache.fury.util.Preconditions; -// TODO support
Re: [PR] chore: rename unsigned varint/varlong method name [incubator-fury]
PragmaTwice commented on PR #1480: URL: https://github.com/apache/incubator-fury/pull/1480#issuecomment-2044665812 Actually if it's C++/rust I will recommend to use template/generics instead of put type into function name. But not sure if it's a good way in Java. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org
Re: [PR] chore: rename unsigned varint/varlong method name [incubator-fury]
chaokunyang commented on PR #1480: URL: https://github.com/apache/incubator-fury/pull/1480#issuecomment-2044659762 > > The thing is that java `java.nio.ByteBuffer` use `getInt/getLong/putInt/putLong`. If we use a different name here, it may intruduce confusition to users. `org.apache.arrow.memory.ArrowBuf` also use `setInt/setLong/putInt/putLong` > > I still prefer `int32/int64`. We can emphasize the compatibility issues of ByteBuffer in the document. We can ask other members of the community for suggestions. Seems `int32/64` style is better, I will post this to the dev@fury apache.org for more feedbacks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: commits-unsubscr...@fury.apache.org For additional commands, e-mail: commits-h...@fury.apache.org
Re: [PR] feat(java): channel stream reader [incubator-fury]
chaokunyang commented on code in PR #1483: URL: https://github.com/apache/incubator-fury/pull/1483#discussion_r1557239843 ## java/fury-core/src/main/java/org/apache/fury/io/FuryReadableChannel.java: ## @@ -20,30 +20,130 @@ package org.apache.fury.io; import java.io.IOException; -import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.ReadableByteChannel; +import javax.annotation.concurrent.NotThreadSafe; +import org.apache.fury.exception.DeserializationException; import org.apache.fury.memory.MemoryBuffer; +import org.apache.fury.util.Platform; +import org.apache.fury.util.Preconditions; -// TODO support zero-copy channel reading. -public class FuryReadableChannel extends AbstractStreamReader implements ReadableByteChannel { +@NotThreadSafe +public class FuryReadableChannel implements FuryStreamReader, ReadableByteChannel { private final ReadableByteChannel channel; - private final ByteBuffer byteBuffer; - private final MemoryBuffer buffer; + private final MemoryBuffer memoryBuffer; + private ByteBuffer byteBuffer; public FuryReadableChannel(ReadableByteChannel channel) { -this(channel, ByteBuffer.allocate(4096)); +this(channel, ByteBuffer.allocateDirect(4096)); } - private FuryReadableChannel(ReadableByteChannel channel, ByteBuffer directBuffer) { + public FuryReadableChannel(ReadableByteChannel channel, ByteBuffer directBuffer) { +Preconditions.checkArgument( +directBuffer.isDirect(), "FuryReadableChannel support only direct ByteBuffer."); this.channel = channel; this.byteBuffer = directBuffer; -this.buffer = MemoryBuffer.fromByteBuffer(directBuffer); + +long offHeapAddress = Platform.getAddress(directBuffer) + directBuffer.position(); +this.memoryBuffer = new MemoryBuffer(offHeapAddress, 0, directBuffer, this); + } + + @Override + public int fillBuffer(int minFillSize) { +try { + ByteBuffer byteBuf = byteBuffer; + MemoryBuffer memoryBuf = memoryBuffer; + int position = byteBuf.position(); + int newLimit = position + minFillSize; + if (newLimit > byteBuf.capacity()) { +int newSize = +newLimit < BUFFER_GROW_STEP_THRESHOLD ? newLimit << 2 : (int) (newLimit * 1.5); +ByteBuffer newByteBuf = ByteBuffer.allocateDirect(newSize); +byteBuf.position(0); +newByteBuf.put(byteBuf); +byteBuf = byteBuffer = newByteBuf; +memoryBuf.initDirectBuffer(Platform.getAddress(byteBuf), position, byteBuf); + } + byteBuf.limit(newLimit); + int readCount = channel.read(byteBuf); + memoryBuf.increaseSize(readCount); + return readCount; +} catch (IOException e) { + throw new DeserializationException("Failed to read the provided byte channel", e); +} } @Override public int read(ByteBuffer dst) throws IOException { -throw new UnsupportedEncodingException(); +int dstRemaining = dst.remaining(); +if (dstRemaining <= 0) { + return 0; +} +MemoryBuffer buf = memoryBuffer; +int remaining = buf.remaining(); +if (remaining <= 0) { + return -1; +} +if (remaining >= dstRemaining) { + byte[] bytes = buf.readBytes(dstRemaining); + dst.put(bytes); + return dstRemaining; +} else { + int filledSize = fillBuffer(dstRemaining - remaining); + int length = remaining + filledSize; + byte[] bytes = buf.readBytes(length); + dst.put(bytes); + return length; +} + } + + @Override + public void readTo(byte[] dst, int dstIndex, int length) { +MemoryBuffer buf = memoryBuffer; +int remaining = buf.remaining(); +if (remaining >= length) { + buf.readBytes(dst, dstIndex, length); +} else { + buf.readBytes(dst, dstIndex, remaining); + try { +ByteBuffer buffer = ByteBuffer.wrap(dst, dstIndex + remaining, length - remaining); +channel.read(buffer); + } catch (IOException e) { +throw new DeserializationException("Failed to read the provided byte channel", e); + } +} + } + + @Override + public void readToUnsafe(Object target, long targetPointer, int numBytes) { +MemoryBuffer buf = memoryBuffer; +int remaining = buf.remaining(); +if (remaining < numBytes) { + fillBuffer(numBytes - remaining); +} +long address = buf.getUnsafeReaderAddress(); +Platform.copyMemory(null, address, target, targetPointer, numBytes); +buf.increaseReaderIndex(numBytes); + } + + @Override + public void readToByteBuffer(ByteBuffer dst, int length) { +readToByteBuffer0(dst, length); + } + + @Override + public int readToByteBuffer(ByteBuffer dst) { +return readToByteBuffer0(dst, dst.remaining()); Review Comment: `FuryInputStream ` only read `stream.available()` instead of read until `dst.remaining()`. If we read until `dst.remaining()`, it may block the current thread, which is unpected if users want to
Re: [PR] feat(java): channel stream reader [incubator-fury]
Munoon commented on code in PR #1483: URL: https://github.com/apache/incubator-fury/pull/1483#discussion_r1557102351 ## java/fury-core/src/main/java/org/apache/fury/io/FuryReadableChannel.java: ## @@ -20,30 +20,130 @@ package org.apache.fury.io; import java.io.IOException; -import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.ReadableByteChannel; +import javax.annotation.concurrent.NotThreadSafe; +import org.apache.fury.exception.DeserializationException; import org.apache.fury.memory.MemoryBuffer; +import org.apache.fury.util.Platform; +import org.apache.fury.util.Preconditions; -// TODO support zero-copy channel reading. -public class FuryReadableChannel extends AbstractStreamReader implements ReadableByteChannel { +@NotThreadSafe +public class FuryReadableChannel implements FuryStreamReader, ReadableByteChannel { private final ReadableByteChannel channel; - private final ByteBuffer byteBuffer; - private final MemoryBuffer buffer; + private final MemoryBuffer memoryBuffer; + private ByteBuffer byteBuffer; public FuryReadableChannel(ReadableByteChannel channel) { -this(channel, ByteBuffer.allocate(4096)); +this(channel, ByteBuffer.allocateDirect(4096)); } - private FuryReadableChannel(ReadableByteChannel channel, ByteBuffer directBuffer) { + public FuryReadableChannel(ReadableByteChannel channel, ByteBuffer directBuffer) { +Preconditions.checkArgument( +directBuffer.isDirect(), "FuryReadableChannel support only direct ByteBuffer."); this.channel = channel; this.byteBuffer = directBuffer; -this.buffer = MemoryBuffer.fromByteBuffer(directBuffer); + +long offHeapAddress = Platform.getAddress(directBuffer) + directBuffer.position(); +this.memoryBuffer = new MemoryBuffer(offHeapAddress, 0, directBuffer, this); + } + + @Override + public int fillBuffer(int minFillSize) { +try { + ByteBuffer byteBuf = byteBuffer; + MemoryBuffer memoryBuf = memoryBuffer; + int position = byteBuf.position(); + int newLimit = position + minFillSize; + if (newLimit > byteBuf.capacity()) { +int newSize = +newLimit < BUFFER_GROW_STEP_THRESHOLD ? newLimit << 2 : (int) (newLimit * 1.5); +ByteBuffer newByteBuf = ByteBuffer.allocateDirect(newSize); +byteBuf.position(0); +newByteBuf.put(byteBuf); +byteBuf = byteBuffer = newByteBuf; +memoryBuf.initDirectBuffer(Platform.getAddress(byteBuf), position, byteBuf); + } + byteBuf.limit(newLimit); + int readCount = channel.read(byteBuf); + memoryBuf.increaseSize(readCount); + return readCount; +} catch (IOException e) { + throw new DeserializationException("Failed to read the provided byte channel", e); +} } @Override public int read(ByteBuffer dst) throws IOException { -throw new UnsupportedEncodingException(); +int dstRemaining = dst.remaining(); +if (dstRemaining <= 0) { + return 0; +} +MemoryBuffer buf = memoryBuffer; +int remaining = buf.remaining(); +if (remaining <= 0) { + return -1; +} +if (remaining >= dstRemaining) { + byte[] bytes = buf.readBytes(dstRemaining); + dst.put(bytes); + return dstRemaining; +} else { + int filledSize = fillBuffer(dstRemaining - remaining); + int length = remaining + filledSize; + byte[] bytes = buf.readBytes(length); + dst.put(bytes); + return length; +} + } + + @Override + public void readTo(byte[] dst, int dstIndex, int length) { +MemoryBuffer buf = memoryBuffer; +int remaining = buf.remaining(); +if (remaining >= length) { + buf.readBytes(dst, dstIndex, length); +} else { + buf.readBytes(dst, dstIndex, remaining); + try { +ByteBuffer buffer = ByteBuffer.wrap(dst, dstIndex + remaining, length - remaining); +channel.read(buffer); + } catch (IOException e) { +throw new DeserializationException("Failed to read the provided byte channel", e); + } +} + } + + @Override + public void readToUnsafe(Object target, long targetPointer, int numBytes) { +MemoryBuffer buf = memoryBuffer; +int remaining = buf.remaining(); +if (remaining < numBytes) { + fillBuffer(numBytes - remaining); +} +long address = buf.getUnsafeReaderAddress(); +Platform.copyMemory(null, address, target, targetPointer, numBytes); +buf.increaseReaderIndex(numBytes); + } + + @Override + public void readToByteBuffer(ByteBuffer dst, int length) { +readToByteBuffer0(dst, length); + } + + @Override + public int readToByteBuffer(ByteBuffer dst) { +return readToByteBuffer0(dst, dst.remaining()); Review Comment: Do you think we should change the implementation of this method? As I see, the `FuryInputStream` is implemented in the similar way: it checks the `dst.remaining()` and read from stream if he didn't
Re: [PR] feat(java): channel stream reader [incubator-fury]
chaokunyang commented on code in PR #1483: URL: https://github.com/apache/incubator-fury/pull/1483#discussion_r1557089842 ## java/fury-core/src/main/java/org/apache/fury/io/FuryReadableChannel.java: ## @@ -20,30 +20,130 @@ package org.apache.fury.io; import java.io.IOException; -import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.ReadableByteChannel; +import javax.annotation.concurrent.NotThreadSafe; +import org.apache.fury.exception.DeserializationException; import org.apache.fury.memory.MemoryBuffer; +import org.apache.fury.util.Platform; +import org.apache.fury.util.Preconditions; -// TODO support zero-copy channel reading. -public class FuryReadableChannel extends AbstractStreamReader implements ReadableByteChannel { +@NotThreadSafe +public class FuryReadableChannel implements FuryStreamReader, ReadableByteChannel { private final ReadableByteChannel channel; - private final ByteBuffer byteBuffer; - private final MemoryBuffer buffer; + private final MemoryBuffer memoryBuffer; + private ByteBuffer byteBuffer; public FuryReadableChannel(ReadableByteChannel channel) { -this(channel, ByteBuffer.allocate(4096)); +this(channel, ByteBuffer.allocateDirect(4096)); } - private FuryReadableChannel(ReadableByteChannel channel, ByteBuffer directBuffer) { + public FuryReadableChannel(ReadableByteChannel channel, ByteBuffer directBuffer) { +Preconditions.checkArgument( +directBuffer.isDirect(), "FuryReadableChannel support only direct ByteBuffer."); this.channel = channel; this.byteBuffer = directBuffer; -this.buffer = MemoryBuffer.fromByteBuffer(directBuffer); + +long offHeapAddress = Platform.getAddress(directBuffer) + directBuffer.position(); +this.memoryBuffer = new MemoryBuffer(offHeapAddress, 0, directBuffer, this); + } + + @Override + public int fillBuffer(int minFillSize) { +try { + ByteBuffer byteBuf = byteBuffer; + MemoryBuffer memoryBuf = memoryBuffer; + int position = byteBuf.position(); + int newLimit = position + minFillSize; + if (newLimit > byteBuf.capacity()) { +int newSize = +newLimit < BUFFER_GROW_STEP_THRESHOLD ? newLimit << 2 : (int) (newLimit * 1.5); +ByteBuffer newByteBuf = ByteBuffer.allocateDirect(newSize); +byteBuf.position(0); +newByteBuf.put(byteBuf); +byteBuf = byteBuffer = newByteBuf; +memoryBuf.initDirectBuffer(Platform.getAddress(byteBuf), position, byteBuf); + } + byteBuf.limit(newLimit); + int readCount = channel.read(byteBuf); + memoryBuf.increaseSize(readCount); + return readCount; +} catch (IOException e) { + throw new DeserializationException("Failed to read the provided byte channel", e); +} } @Override public int read(ByteBuffer dst) throws IOException { -throw new UnsupportedEncodingException(); +int dstRemaining = dst.remaining(); +if (dstRemaining <= 0) { + return 0; +} +MemoryBuffer buf = memoryBuffer; +int remaining = buf.remaining(); +if (remaining <= 0) { + return -1; +} +if (remaining >= dstRemaining) { + byte[] bytes = buf.readBytes(dstRemaining); + dst.put(bytes); + return dstRemaining; +} else { + int filledSize = fillBuffer(dstRemaining - remaining); + int length = remaining + filledSize; + byte[] bytes = buf.readBytes(length); + dst.put(bytes); + return length; +} + } + + @Override + public void readTo(byte[] dst, int dstIndex, int length) { +MemoryBuffer buf = memoryBuffer; +int remaining = buf.remaining(); +if (remaining >= length) { + buf.readBytes(dst, dstIndex, length); +} else { + buf.readBytes(dst, dstIndex, remaining); + try { +ByteBuffer buffer = ByteBuffer.wrap(dst, dstIndex + remaining, length - remaining); +channel.read(buffer); + } catch (IOException e) { +throw new DeserializationException("Failed to read the provided byte channel", e); + } +} + } + + @Override + public void readToUnsafe(Object target, long targetPointer, int numBytes) { +MemoryBuffer buf = memoryBuffer; +int remaining = buf.remaining(); +if (remaining < numBytes) { + fillBuffer(numBytes - remaining); +} +long address = buf.getUnsafeReaderAddress(); +Platform.copyMemory(null, address, target, targetPointer, numBytes); +buf.increaseReaderIndex(numBytes); + } + + @Override + public void readToByteBuffer(ByteBuffer dst, int length) { +readToByteBuffer0(dst, length); + } + + @Override + public int readToByteBuffer(ByteBuffer dst) { +return readToByteBuffer0(dst, dst.remaining()); Review Comment: It's OK for this method to return data less than `dst.remaining()` since we returned a number how much data we read -- This is an automated message from the Apache Git Service. To respond to