The stream is a data structure that evaluates data when we need them. Below is a typical Python Stream.
from functools import lru_cache class Stream: def __init__(self, head, restfn=None): self.head = head self.restfn = restfn @property def stopped(self): return self.restfn is None @property @lru_cache() def rest(self): assert not self.stopped, 'No rest for stopped stream.' return self.restfn()
Considering the usage, it often leads to a recursive definition. For example, a stream that keeps returning number
1 has below form:
def one_stream(): return Stream(1, one_stream)
We can get
head for the evaluated value, and
rest for the rest of the stream.
>>> one_s = one_stream() >>> print(one_s.head) 1 >> print(one_s.rest.head) 1 >>> print(one_s.rest.rest.head) 1
You might notice that
one_stream creates an infinite stream, meaning the stream never stops generating
1. We can indicate a stopped stream by setting
None. Below example shows a stopwatch stream that eventually stopped after a given period.
from time import time def stopwatch_stream(period, start=None): now = time() start = start or now left = max(0, period - (now - start)) rest = lambda: stopwatch_stream(period, start) return Stream(left, rest if left > 0 else None) >>> stopwatch_s = stopwatch_stream(10) >>> stopwatch_s.head 10.0 >>> stopwatch_s.rest.head 5.976487159729004 >>> stopwatch_s.rest.rest.head 1.7044363021850586 >>> stopwatch_s.rest.rest.rest.head 0 >>> stopwatch_s.rest.rest.rest.stopped True
The stream approach takes a different way of combining modules from Object-Oriented programming. We need to think of the history of time series for a module, rather than the states at any specific moment. Although we still handle states one by one, we don't care when the state change happens. The
stopwatch_stream allows the caller only caring the
left time for the period.
We build a datagram_stream on top of the
stopwatch_stream below. It creates a stream that either is stopped or has received a datagram.
import json def receive(udp_server, timeout): udp_server.settimeout(timeout) try: return udp_server.recvfrom(8192) except socket.timeout: return None def datagram_stream(udp_server, timeout, left=None): left = left or stopwatch_stream(timeout) data = receive(udp_server, left.head) if not left.stopped else None rest = lambda: datagram_stream(udp_server, second, left.rest) return Stream(data, rest if data else None)
Given a raft follower stream, it only cares the datagram stream and either turn it into a new follower stream if received any datagram or converts to a new candidate stream.
def follower_stream(state): datagrams = datagram_stream(timeout=state.election_interval) timeout = datagrams.stopped state = reduce_stream(handle_datagram, datagrams, init=state) rest = lambda: candidate_stream(state) if timeout else follower_stream(state) return Stream(state, rest)
reduce_stream can be very similar to the
reduce function in functional programming. The
handle_datagram transform an old
state into a new
state based on the received datagram. The
candidate_stream creates a new stream that performs candidate behaviors.
Similarly, we can implement the candidate stream and leader stream in such way.
def candidate_stream(state): state = elect_self(state) datagrams = datagram_stream(timeout=state.election_interval) timeout = datagrams.stopped state = reduce_stream(handle_datagram, datagrams, init=state) def rest(): if timeout: return candidate_stream(state) elif state.is_follower: return follower_stream(state) else: return leader_stream(state) return Stream(state, rest) def leader_stream(state): state = send_heartbeat(state) datagram_stream = datagram_stream(timeout=state.heartbeat_interval) state = reduce_stream(handle_datagram, datagrams, init=state) rest = lambda: follower_stream(state) if state.is_follower else leader_stream(state) return Stream(state, rest)
The candidate requests to vote for itself and then waiting for votes before timeout. The leader sends heartbeats to all other nodes and then waiting for the response before timeout.
By applying the Stream paradigm, we now only need to focus on implementing
def handle_datagram(state, datagram) -> state, which is purely functional and testable. I'll leave the implementation of it for you as a practice. 😃 Enjoy hacking!