Redis Stream Examples#

basic config#

[1]:
redis_host = "redis"
stream_key = "skey"
stream2_key = "s2key"
group1 = "grp1"
group2 = "grp2"

connection#

[2]:
import redis
from time import time
from redis.exceptions import ConnectionError, DataError, NoScriptError, RedisError, ResponseError

r = redis.Redis( redis_host )
r.ping()
[2]:
True

xadd and xread#

add some data to the stream#

[3]:
for i in range(0,10):
    r.xadd( stream_key, { 'ts': time(), 'v': i } )
print( f"stream length: {r.xlen( stream_key )}")
stream length: 10

read some data from the stream#

[4]:
## read 2 entries from stream_key
l = r.xread( count=2, streams={stream_key:0} )
print(l)
[[b'skey', [(b'1657571033115-0', {b'ts': b'1657571033.1128936', b'v': b'0'}), (b'1657571033117-0', {b'ts': b'1657571033.1176307', b'v': b'1'})]]]

extract data from the returned structure#

[5]:
first_stream = l[0]
print( f"got data from stream: {first_stream[0]}")
fs_data = first_stream[1]
for id, value in fs_data:
    print( f"id: {id} value: {value[b'v']}")
got data from stream: b'skey'
id: b'1657571033115-0' value: b'0'
id: b'1657571033117-0' value: b'1'

read more data from the stream#

if we call the xread with the same arguments we will get the same data

[6]:
l = r.xread( count=2, streams={stream_key:0} )
for id, value in l[0][1]:
    print( f"id: {id} value: {value[b'v']}")
id: b'1657571033115-0' value: b'0'
id: b'1657571033117-0' value: b'1'

to get new data we need to change the key passed to the call

[7]:
last_id_returned = l[0][1][-1][0]
l = r.xread( count=2, streams={stream_key: last_id_returned} )
for id, value in l[0][1]:
    print( f"id: {id} value: {value[b'v']}")
id: b'1657571033118-0' value: b'2'
id: b'1657571033119-0' value: b'3'
[8]:
last_id_returned = l[0][1][-1][0]
l = r.xread( count=2, streams={stream_key: last_id_returned} )
for id, value in l[0][1]:
    print( f"id: {id} value: {value[b'v']}")
id: b'1657571033119-1' value: b'4'
id: b'1657571033121-0' value: b'5'

to get only newer entries

[9]:
print( f"stream length: {r.xlen( stream_key )}")
# wait for 5s for new messages
l = r.xread( count=1, block=5000, streams={stream_key: '$'} )
print( f"after 5s block, got an empty list {l}, no *new* messages on the stream")
print( f"stream length: {r.xlen( stream_key )}")
stream length: 10
after 5s block, got an empty list [], no *new* messages on the stream
stream length: 10

2nd stream#

Add some messages to a 2nd stream

[10]:
for i in range(1000,1010):
    r.xadd( stream2_key, { 'v': i } )
print( f"stream length: {r.xlen( stream2_key )}")
stream length: 10

get messages from the 2 streams

[11]:
l = r.xread( count=1, streams={stream_key:0,stream2_key:0} )
for k,d in l:
    print(f"got from {k} the entry {d}")
got from b'skey' the entry [(b'1657571033115-0', {b'ts': b'1657571033.1128936', b'v': b'0'})]
got from b's2key' the entry [(b'1657571042111-0', {b'v': b'1000'})]

stream groups#

With the groups is possible track, for many consumers, and at the Redis side, which message have been already consumed. ## add some data to streams Creating 2 streams with 10 messages each.

[12]:
def add_some_data_to_stream( sname, key_range ):
    for i in key_range:
        r.xadd( sname, { 'ts': time(), 'v': i } )
    print( f"stream '{sname}' length: {r.xlen( stream_key )}")

add_some_data_to_stream( stream_key, range(0,10) )
add_some_data_to_stream( stream2_key, range(1000,1010) )
stream 'skey' length: 20
stream 's2key' length: 20

use a group to read from the stream#

  • create a group grp1 with the stream skey, and

  • create a group grp2 with the streams skey and s2key

Use the xinfo_group to verify the result of the group creation.

[13]:
## create the group
def create_group( skey, gname ):
    try:
        r.xgroup_create( name=skey, groupname=gname, id=0 )
    except ResponseError as e:
        print(f"raised: {e}")

# group1 read the stream 'skey'
create_group( stream_key, group1 )
# group2 read the streams 'skey' and 's2key'
create_group( stream_key, group2 )
create_group( stream2_key, group2 )

def group_info( skey ):
    res = r.xinfo_groups( name=skey )
    for i in res:
        print( f"{skey} -> group name: {i['name']} with {i['consumers']} consumers and {i['last-delivered-id']}"
              + f" as last read id")

group_info( stream_key )
group_info( stream2_key )
skey -> group name: b'grp1' with 0 consumers and b'0-0' as last read id
skey -> group name: b'grp2' with 0 consumers and b'0-0' as last read id
s2key -> group name: b'grp2' with 0 consumers and b'0-0' as last read id

group read#

The xreadgroup method permit to read from a stream group.

[14]:
def print_xreadgroup_reply( reply, group = None, run = None):
    for d_stream in reply:
        for element in d_stream[1]:
            print(  f"got element {element[0]}"
                  + f"from stream {d_stream[0]}" )
            if run is not None:
                run( d_stream[0], group, element[0] )
[15]:
# read some messages on group1 with consumer 'c'
d = r.xreadgroup( groupname=group1, consumername='c', block=10,
                  count=2, streams={stream_key:'>'})
print_xreadgroup_reply( d )
got element b'1657571033115-0'from stream b'skey'
got element b'1657571033117-0'from stream b'skey'

A 2nd consumer for the same stream group will get not delivered messages.

[16]:
# read some messages on group1 with consumer 'c'
d = r.xreadgroup( groupname=group1, consumername='c2', block=10,
                  count=2, streams={stream_key:'>'})
print_xreadgroup_reply( d )
got element b'1657571033118-0'from stream b'skey'
got element b'1657571033119-0'from stream b'skey'

But a 2nd stream group can read the already delivered messages again.

Note that the 2nd stream group include also the 2nd stream. That can be identified in the reply (1st element of the reply list).

[18]:
d2 = r.xreadgroup( groupname=group2, consumername='c', block=10,
                   count=2, streams={stream_key:'>',stream2_key:'>'})
print_xreadgroup_reply( d2 )
got element b'1657571033115-0'from stream b'skey'
got element b'1657571033117-0'from stream b'skey'
got element b'1657571042111-0'from stream b's2key'
got element b'1657571042113-0'from stream b's2key'

To check for pending messages (delivered messages without acknowledgment) we can use the xpending.

[19]:
# check pending status (read messages without a ack)
def print_pending_info( key_group ):
    for s,k in key_group:
        pr = r.xpending( name=s, groupname=k )
        print( f"{pr.get('pending')} pending messages on '{s}' for group '{k}'" )

print_pending_info( ((stream_key,group1),(stream_key,group2),(stream2_key,group2)) )
4 pending messages on 'skey' for group 'grp1'
2 pending messages on 'skey' for group 'grp2'
2 pending messages on 's2key' for group 'grp2'

ack#

Acknowledge some messages with xack.

[20]:
# do acknowledges for group1
toack = lambda k,g,e: r.xack( k,g, e )
print_xreadgroup_reply( d, group=group1, run=toack )
got element b'1657571033118-0'from stream b'skey'
got element b'1657571033119-0'from stream b'skey'
[21]:
# check pending again
print_pending_info( ((stream_key,group1),(stream_key,group2),(stream2_key,group2)) )
2 pending messages on 'skey' for group 'grp1'
2 pending messages on 'skey' for group 'grp2'
2 pending messages on 's2key' for group 'grp2'

ack all messages on the group1.

[22]:
d = r.xreadgroup( groupname=group1, consumername='c', block=10,
                      count=100, streams={stream_key:'>'})
print_xreadgroup_reply( d, group=group1, run=toack)
print_pending_info( ((stream_key,group1),) )
got element b'1657571033119-1'from stream b'skey'
got element b'1657571033121-0'from stream b'skey'
got element b'1657571033121-1'from stream b'skey'
got element b'1657571033121-2'from stream b'skey'
got element b'1657571033122-0'from stream b'skey'
got element b'1657571033122-1'from stream b'skey'
got element b'1657571049557-0'from stream b'skey'
got element b'1657571049557-1'from stream b'skey'
got element b'1657571049558-0'from stream b'skey'
got element b'1657571049559-0'from stream b'skey'
got element b'1657571049559-1'from stream b'skey'
got element b'1657571049559-2'from stream b'skey'
got element b'1657571049560-0'from stream b'skey'
got element b'1657571049562-0'from stream b'skey'
got element b'1657571049563-0'from stream b'skey'
got element b'1657571049563-1'from stream b'skey'
2 pending messages on 'skey' for group 'grp1'

But stream length will be the same after the xack of all messages on the group1.

[24]:
r.xlen(stream_key)
[24]:
20

delete all#

To remove the messages with need to remote them explicitly with xdel.

[25]:
s1 = r.xread( streams={stream_key:0} )
for streams in s1:
    stream_name, messages = streams
    # del all ids from the message list
    [ r.xdel( stream_name, i[0] ) for i in messages ]

stream length

[26]:
r.xlen(stream_key)
[26]:
0

But with the xdel the 2nd group can read any not processed message from the skey.

[27]:
d2 = r.xreadgroup( groupname=group2, consumername='c', block=10,
                   count=2, streams={stream_key:'>',stream2_key:'>'})
print_xreadgroup_reply( d2 )
got element b'1657571042113-1'from stream b's2key'
got element b'1657571042114-0'from stream b's2key'
[ ]: