class Redis::ReplicationClient

Overview

If you're using Redis replication, you can use ReplicationClient to send read commands to replicas and reduce load on the primary. This can be important when your Redis primary is CPU-bound.

The commands that will be routed to replicas are listed in Redis::READ_ONLY_COMMANDS.

NOTE Redis replication does not provide consistency guarantees. Every mechanism in Redis to improve consistency, such as WAIT, is best-effort, but not guaranteed. If you require strong consistency from Redis, stick to using Redis::Client. if you require strong consistency but your Redis primary is CPU-bound, you may need to either choose between consistency and performance or move that workload out of Redis.

This client is useful for operations where strong consistency isn't typically needed, such as caching, full-text search with Redis::FullText#search, querying time-series data with Redis::TimeSeries#mrange, checking the current state of larger data structures without blocking the primary, etc.

Explicitly routing commands to a primary or replica

This class provides #on_primary and #on_replica methods to ensure your command is routed to the server type you want. This is useful in several scenarios:

Topology changes

If the replication topology changes (for example, new replicas are added, existing ones removed, or the primary failed over), ReplicationClient will automatically pick up the changes. You can set how often it checks for these changes with the #topology_ttl argument to the constructor or leave it at its default of 10 seconds.

EXPERIMENTAL ReplicationClient is currently in alpha testing. There may be rough edges.

Included Modules

Defined in:

replication_client.cr

Constant Summary

Log = ::Log.for(self)

Constructors

Instance Method Summary

Instance methods inherited from module Redis::Commands

decr(key : String) decr, decrby(key : String, amount : Int | String) decrby, del(keys : Enumerable(String))
del(*keys : String)
del
, eval(script : String, keys : Enumerable(String) = EmptyEnumerable.new, args : Enumerable(String) = EmptyEnumerable.new) eval, eval_ro(script : String, keys : Enumerable(String) = EmptyEnumerable.new, args : Enumerable(String) = EmptyEnumerable.new) eval_ro, evalsha(sha : String, keys : Enumerable(String) = EmptyEnumerable.new, args : Enumerable(String) = EmptyEnumerable.new) evalsha, evalsha_ro(script : String, keys : Enumerable(String) = EmptyEnumerable.new, args : Enumerable(String) = EmptyEnumerable.new) evalsha_ro, exists(*keys : String) exists, expire(key : String, ttl : Time::Span)
expire(key : String, ttl : Int)
expire
, expireat(key : String, at : Time) expireat, flushall flushall, flushdb flushdb, ft ft, get(key : String) get, graph(key : String) graph, incr(key : String) incr, incrby(key : String, amount : Int | String) incrby, info info, json json, keys(pattern = "*") keys, mget(keys : Enumerable(String)) mget, mset(data : Hash(String, String)) mset, pexpire(key : String, ttl : Time::Span)
pexpire(key : String, ttl : Int)
pexpire
, pexpireat(key : String, at : Time) pexpireat, pttl(key : String) pttl, publish(channel : String, message : String) publish, run(command) run, scan(cursor : String = "0", match : String | Nil = nil, count : String | Int | Nil = nil, type : String | Nil = nil) scan, script_exists(shas : Enumerable(String))
script_exists(*shas : String)
script_exists
, script_flush(mode : ScriptFlushMode) script_flush, script_kill script_kill, script_load(script : String) script_load, set(key : String, value : String, ex : String | Int | Nil = nil, px : String | Int | Nil = nil, nx = false, xx = false, keepttl = false)
set(key, value, ex : Time, nx = false, xx = false, keepttl = false)
set(key, value, ex : Time::Span, nx = false, xx = false, keepttl = false)
set
, ts ts, ttl(key : String) ttl, type(key : String) type, unlink(keys : Enumerable(String))
unlink(*keys : String)
unlink
, wait(numreplicas replica_count : Int | String, timeout : Time::Span)
wait(numreplicas replica_count : Int | String, timeout : Int | String)
wait

Instance methods inherited from module Redis::Commands::Stream

xack(key : String, group : String, id : String)
xack(key : String, group : String, ids : Enumerable(String))
xack
, xadd(key : String, id : String, maxlen, data : ::Hash(String, String))
xadd(key : String, id : String, data : ::Hash(String, String))
xadd(key : String, id : String, maxlen = nil, **data)
xadd
, xautoclaim(key : String, group : String, consumer : String, min_idle_time : Time::Span, start : String, count : Int | String | Nil = nil) xautoclaim, xdel(key : String, ids : Enumerable(String))
xdel(key : String, *ids : String)
xdel
, xgroup(command : String, key : String, groupname : String)
xgroup(command : XGroup, key : String, groupname : String, *, id : String | Nil = nil, mkstream = false, consumer_name : String | Nil = nil)
xgroup(command : String, key : String, groupname : String, *args : String)
xgroup
, xgroup_create(key : String, groupname : String, *, id : String = "$", mkstream = false) xgroup_create, xgroup_create_consumer(key : String, groupname : String, consumer_name : String) xgroup_create_consumer, xlen(key : String) xlen, xpending(key : String, group : String, start : String, end finish : String, count : String | Int, idle : String | Time::Span | Nil = nil)
xpending(key : String, group : String)
xpending
, xrange(key : String, start min : String, end max : String, count : String | Int | Nil = nil) xrange, xreadgroup(group : String, consumer : String, count : String | Int | Nil = nil, block : Time::Span | String | Int | Nil = nil, no_ack = false, streams : ::Hash(String, String) = {} of String => String)
xreadgroup(group : String, consumer : String, count : String | Int | Nil = nil, block : Time::Span | String | Int | Nil = nil, no_ack = false, streams : NamedTuple = NamedTuple.new)
xreadgroup

Instance methods inherited from module Redis::Commands::SortedSet

zadd(key : String, score : String | Int64, value : String)
zadd(key : String, values : Enumerable(String))
zadd
, zcard(key : String) zcard, zcount(key : String, min : String, max : String) zcount, zrange(key : String, starting : String | Int64, ending : String | Int64, with_scores : Bool = false) zrange, zrangebyscore(key : String, low : String | Int64, high : String | Int64, limit : Enumerable(String) | Nil = nil) zrangebyscore, zrem(key : String, value : String)
zrem(key : String, values : Enumerable(String))
zrem(key : String, *values : String)
zrem
, zremrangebyrank(key : String, low : Int64, high : Int64) zremrangebyrank, zremrangebyscore(key : String, low : String | Int64, high : String | Int64) zremrangebyscore, zrevrange(key : String, starting : String | Int64, ending : String | Int64, with_scores : Bool = false) zrevrange, zscore(key : String, value : String) zscore

Instance methods inherited from module Redis::Commands::Set

sadd(key : String, *values : String) sadd, scard(key : String) scard, sdiff(first : String, second : String) sdiff, sinter(first : String, *others : String) sinter, sismember(key : String, value : String) sismember, smembers(key : String) smembers, srem(key : String, members : Enumerable(String))
srem(key : String, *values : String)
srem

Instance methods inherited from module Redis::Commands::List

blpop(keys : Enumerable(String), timeout : Time::Span)
blpop(*keys : String, timeout : Time::Span)
blpop(*keys : String, timeout : Int | Float)
blpop(*keys : String, timeout : String)
blpop
, brpop(keys : Enumerable(String), timeout : Int)
brpop(*keys : String, timeout : Time::Span)
brpop(*keys : String, timeout : Number)
brpop(*keys : String, timeout : String)
brpop
, brpoplpush(source : String, destination : String, timeout : Time::Span)
brpoplpush(source : String, destination : String, timeout : Int | String)
brpoplpush
, llen(key : String) llen, lmove(from source : String, to destination : String, from_side source_side : Side, to_side destination_side : Side) lmove, lpop(key : String, count : String | Nil = nil) lpop, lpush(key : String, values : Enumerable(String))
lpush(key, *values : String)
lpush
, lrange(key : String, start : String | Int, finish : String | Int) lrange, lrem(key : String, count : Int, value : String) lrem, ltrim(key : String, start : String | Int, stop : String | Int)
ltrim(key : String, range : Range(String, String))
ltrim(key : String, range : Range(Int32, Int32))
ltrim
, rpop(key : String) rpop, rpoplpush(source : String, destination : String) rpoplpush, rpush(key : String, values : Enumerable(String))
rpush(key, *values : String)
rpush

Instance methods inherited from module Redis::Commands::Hash

hdel(key : String, fields : Enumerable(String))
hdel(key : String, *fields : String)
hdel
, hget(key : String, field : String) hget, hgetall(key : String) hgetall, hincrby(key : String, field : String, increment : Int | String) hincrby, hmget(key : String, fields : Enumerable(String))
hmget(key : String, *fields : String)
hmget
, hmset(key : String, data : ::Hash(String, String)) hmset, hscan(key : String, cursor : String, *, match pattern : String | Nil = nil, count : String | Int | Nil = nil) hscan, hset(key : String, fields : Enumerable(String))
hset(key : String, fields : ::Hash(String, String))
hset(key : String, *fields : String)
hset(key : String, **fields : String)
hset
, hsetnx(key : String, field : String, value : String) hsetnx

Constructor Detail

def self.new(entrypoint : URI, topology_ttl : Time::Span = 10.seconds) #

Have the ReplicationClient discover the master and replicas on its own when given the URI of a single entrypoint. The cluster topology will be refreshed with a max staleness of #topology_ttl.

redis = Redis::ReplicationClient.new(

[View source]
def self.new(master_uri : URI, replica_uris : Array(URI), topology_ttl : Time::Span = 10.seconds) #

Initialize the client with known master and replica URIs, keeping the toplogy up to date with at most #topology_ttl staleness. If you don't wish to keep the replication topology up to date, you can simply set #topology_ttl to 0.seconds.


[View source]
def self.new #

[View source]

Instance Method Detail

def close #

Close all connections to both the primary and all replicas.


[View source]
def closed? : Bool #

Returns true if this ReplicationClient has been explicitly closed, false otherwise.


[View source]
def on_master(&) #

Alias of #on_primary.


[View source]
def on_primary(&) #

Route one or more commands to the primary to avoid consistency issues arising from replication latency.

require "redis/replication_client"

redis = Redis::ReplicationClient.new

redis.incr "counter"
value = redis.on_primary &.get("counter")

This is useful for pipelining commands or executing transactions:

redis.on_primary &.transaction do |txn|
  txn.incr "counter:#{queue}"
  txn.sadd "queues", queue
  txn.lpush "queue:#{queue}", job_data
end

… which is shorthand for this and removes the need for nesting blocks:

redis.on_primary do |primary|
  primary.transaction do |txn|
    txn.incr "counter:#{queue}"
    txn.sadd "queues", queue
    txn.lpush "queue:#{queue}", job_data
  end
end

If you need to route many commands to the primary without necessarily pipelining or opening transactions, you can omit the &.transaction and call methods directly on the primary's Redis::Client in the block:

redis.on_primary do |primary|
  counter = primary.incr "counter:#{queue}"
  primary.sadd "queues", queue
end

NOTE The object yielded to the block is a Redis::Client, but if you try to use it outside the block you may run into errors because the replication topology could change, in which case this Redis::Client might not be the primary anymore.


[View source]
def on_replica(&) #

Route one or more commands to replicas. This should rarely be necessary since read-only commands (which can only be executed on replicas) are automatically routed to replicas, but if it's a command this shard does not know about (see Redis::READ_ONLY_COMMANDS) this may be necessary. Alternatively, you can shovel additional commands into Redis::READ_ONLY_COMMANDS to avoid having to perform this explicit routing.


[View source]
def run(command full_command) #
Description copied from module Redis::Commands

Execute the given command and return the result from the server. Commands must be an Enumerable and its size method must be re-entrant.

run({"set", "foo", "bar"})

[View source]
def topology_ttl : Time::Span #

[View source]