[ https://issues.apache.org/jira/browse/BEAM-11997?focusedWorklogId=755490&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-755490 ]
ASF GitHub Bot logged work on BEAM-11997: ----------------------------------------- Author: ASF GitHub Bot Created on: 11/Apr/22 23:19 Start Date: 11/Apr/22 23:19 Worklog Time Spent: 10m Work Description: pabloem commented on code in PR #15549: URL: https://github.com/apache/beam/pull/15549#discussion_r847813348 ########## sdks/java/io/redis/src/main/java/org/apache/beam/sdk/io/redis/RedisCursor.java: ########## @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.redis; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.util.Objects; +import javax.annotation.Nonnull; +import org.apache.beam.sdk.coders.BigEndianLongCoder; +import org.apache.beam.sdk.io.range.ByteKey; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.checkerframework.checker.nullness.qual.Nullable; + +public class RedisCursor implements Comparable<RedisCursor>, Serializable { + + public static final RedisCursor ZERO_CURSOR = RedisCursor.of("0", 8); + + private final String cursor; + private final ByteKey byteCursor; + private final long dbSize; + private final int nBits; + + public static RedisCursor of(String cursor, long dbSize) { + return new RedisCursor(cursor, dbSize); + } + + public static RedisCursor of(ByteKey byteCursor, long dbSize) { + return new RedisCursor(byteCursor, dbSize); + } + + private RedisCursor(ByteKey byteCursor, long dbSize) { + this.byteCursor = byteCursor; + this.dbSize = dbSize; + this.nBits = getTablePow(dbSize); + this.cursor = byteKeyToString(byteCursor, nBits); + } + + private RedisCursor(String cursor, long dbSize) { + this.cursor = cursor; + this.dbSize = dbSize; + this.nBits = getTablePow(dbSize); + this.byteCursor = stringCursorToByteKey(cursor, this.nBits); + } + + /** + * {@link RedisCursor} implements {@link Comparable Comparable<RedisCursor>} by transforming + * the cursors to an index of the Redis table. + */ + @Override + public int compareTo(@Nonnull RedisCursor other) { + checkNotNull(other, "other"); + return Long.compare(Long.parseLong(cursor), Long.parseLong(other.cursor)); + } + + @Override + public boolean equals(@Nullable Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + RedisCursor that = (RedisCursor) o; + return dbSize == that.dbSize + && nBits == that.nBits + && Objects.equals(cursor, that.cursor) + && Objects.equals(byteCursor, that.byteCursor); + } + + @Override + public int hashCode() { + return Objects.hash(cursor, byteCursor, dbSize, nBits); + } + + public String getCursor() { + return cursor; + } + + public ByteKey getByteCursor() { + return byteCursor; + } + + public long getDbSize() { + return dbSize; + } + + @VisibleForTesting + static ByteKey stringCursorToByteKey(String cursor, int nBits) { + long cursorLong = Long.parseLong(cursor); + long reversed = shiftBits(cursorLong, nBits); + BigEndianLongCoder coder = BigEndianLongCoder.of(); + ByteArrayOutputStream os = new ByteArrayOutputStream(); + try { + coder.encode(reversed, os); + } catch (IOException e) { + throw new IllegalArgumentException("invalid redis cursor " + cursor); + } + byte[] byteArray = os.toByteArray(); + return ByteKey.copyFrom(byteArray); + } + + @VisibleForTesting + static long shiftBits(long a, int nBits) { + long b = 0; Review Comment: can you please use Long.reverse? Issue Time Tracking ------------------- Worklog Id: (was: 755490) Time Spent: 14h 20m (was: 14h 10m) > Implement RedisIO on top of Splittable DoFn > ------------------------------------------- > > Key: BEAM-11997 > URL: https://issues.apache.org/jira/browse/BEAM-11997 > Project: Beam > Issue Type: Improvement > Components: io-java-redis > Reporter: Boyuan Zhang > Priority: P2 > Time Spent: 14h 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.20.1#820001)