The core concept of all deterministic databases is simple: if your database is deterministic, multiple geographically-distributed replicas can execute the same transaction log independently, and they will all reach the same state. The problem of implementing a distributed database is reduced to implementing a distributed log.
However, there is one oddity of the Waltz design: the central locking mechanism. Other deterministic databases don't have anything like this, because it's not necessary. You can keep track of locks locally on each replica, and you can rely on every replica to reach the same conclusions about which transactions succeeded and failed, because they are deterministic.
Can anyone clarify why they are managing centralized locks?
Essentially if each instance can keep track of validating transactions locally, the whole thing can be easily implemented with Bookkeeper and distributed log API [0]. Bookeeper guarantees single writer (the master node which is appending to log) for each ledger. All it needs to be done is to put concurrency control metadata (like version, timestamp, ...) inside the message so each node can validate stuff locally.
Daniel Abadi also has an excellent paper on column databases: http://db.csail.mit.edu/pubs/abadi-column-stores.pdf He's a great person at giving high level overviews of database systems before digging into the implementations.
I have not gone into this at all, but my initial thought is that as long as every log entry also have a unique id of the last log entry at the time of it's writing, you can always piece together a deterministic log without any locking or voting. Just share everything you know.
This design can be extended to support interactive transactions.
Here's how:
1. Assign each transaction a log position. For strict serializability instead of just serializability, make this the highest log position ever acknowledged successfully to a client. This can be batched for throughput.
2. Have each client record their read set and write set, which includes the objects/ranges the transaction read and the writes / operations to be performed.
3. Have clients persist this R/W set into the log, or send directly to the lock server if it is also the master assigning log positions. Again, use batching for throughput.
4. Have your lock server either as a part of the master processing assigning log positions, or have it tail the log separately. The lock server will receive batches of transactions, take locks in the log's defined order, then commit another entry to the log with the commit/abort decision for each.
5. Respond to the client with the commit / abort decision.
To make this easier to program you'll probably want to include a read-your-writes cache on the client.
You can also scale out this log by having clients commit their operations to multiple logs. The only thing that needs to be serialized is the locking and commit / abort decision making. These other logs can be sharded by key range or just chosen randomly, as long as the commit / abort decision log includes which other log the data itself was committed to.
FoundationDB works roughly like this. The terminology is different (resolver instead of lock server, R/W set is conflict ranges, log position is version number, etc) but this is basically how it works.
Yes, it is more similar to Hyder than FoundationDB if you persist the R/W sets to the log itself. FoundationDB gets around this by only keeping the lock table in memory. When any process in the transaction pipeline fails, the master (which holds the version number) advances the version far enough into the future that any in-progress transaction would fail because it is too old.
Last I heard from some friends in YC when considering a position there, Wepay was handling less than 10^6 payments per day. Is that still the case and is something with such low requirements a good replacement for Kafka in the wild?
Your comment made me realize that I had confused WePay with WeChat Pay, which has a slightly different scale. The "We$VERB" field is getting a bit crowded.
I have this nasty feeling that for the scale mentioned the whole logic can be safely run on a single high performance server with the decent database. Those little servers around can either run on the same server or placed on separate server/s around the main one. All nice and simple and no distributed transaction problems.
Here's a dumb question about log-structured systems like this: does this system work nicely with backfills? Suppose you start logging events with Waltz and you want to migrate an existing system's data into the same log. Or something goes wrong and oncall needs to manually insert old events. Does Waltz have capabilities to backfill events into the historical log or reassign transaction IDs?
This might not be needed if this is strictly used for FIFO event consumption, but I guess I was thinking of trying to make a system like this support time-sliced queries.
Yeah, I'm currently getting bitten by various workloads that query by `created_at` instead of a canonical timestamp :) Thanks, I'll take a look!
For the "valid time" primitive I was thinking of implementing something like a hybrid logical clock that CockroachDB has (but with looser guarantees, mostly just need uniqueness and monotonicity). A sequential ID would provide a slightly nicer interface for pagination but has all the problems that I previously mentioned.
Looks like this also uses Zookeeper. Does anyone know of a simple streaming log system / database? Like, SQLite3 for streaming? I'm using this for personal projects more and more, and the solutions I see in this space are always big, distributed and hard to setup and keep running.
I've been using a simple file format that just writes each message out sequentially in a simple format like: [event-id][event-type][event-size][event-bytes]. And there's a small TCP server that speaks Redis protocol to support remote access. But it's not really production code, rather something I hacked together over a couple weeks in the evening.
This is a project of mine I use for some side projects for exactly that reason: I want streaming logs & pubsub semantics. It's basically Kafka xtra-lite. It doesn't have a super simple TCP protocol (though I wouldn't say no to a PR adding one!), it's dual-available over gRPC and HTTP (JSON API).
Disclaimer: It's definitely one of my low-activity side projects and subject to change at any time.
Is Redis Streams backed by disk? How does it perform when the size of the stream exceeds available RAM?
Also, what's Redis' multi-node failover/high availability story these days (with streams)? Last I heard, it wasn't that great [1], but it's been a while.
Redis is an in-memory data store that has different options for persistence (snapshots + oplog) but it's not designed to persist every operation immediately. All data structures are covered including Streams.
Redis keeps the entire working set in RAM so it'll start dropping writes or freeze if you run out of memory. This is where the simplicity and speed comes from and is a fundamental limitation.
There's a simple replica system that works well but failover switching is the problem and requires a separate process or running the Redis Sentinel. There's also Redis Cluster but that just shards the keyspace and doesn't offer any scalability with a single key or stream, and is still hard to manage with the same failover issues.
The OP asked for a single-node option so I suggested Redis, if you need a serious messaging cluster then I recommend Kafka or Pulsar instead.
Thanks. That confirms pretty much the picture I had in my head of how Redis works these days.
It's frustrating that there's no obvious middle ground this and Kafka and Pulsar, both of which are memory-hungry JVM apps with multiple external dependencies. Both require ZooKeeper; Pulsar also requires BookKeeper. None of these components are operationally simple.
I'm a fan of NATS itself, but NATS Streaming's clustering design leaves a lot to be desired. In particular, it punts on failover/HA, asking you to instead run an SQL database or shared file system that provides this. (An obvious low-maintenance option here would be CockroachDB, but NATS doesn't support it.)
I wouldn't put RabbitMQ in this category — it is a classical message broker, not a log. Once you've consumed a message, it's gone, unless you have set it up so ACKed messages are funneled into another queue, but that stuff is finicky and doesn't patch over the fact that underneath it's designed for mutable, ephemeral queues. In particular, you can't peek back into the queue to find older items. You have zero visibility into the contents of the queuem, and you certainly can't treat it as a dependable database.
And, as you say, fragile. I've run RMQ in production for years and I would be very happy if I could throw it out. It's the least well-behaved component in any stack I've used it in. Even Elasticsearch (shudder) is better at not losing data. Not just the clustering, either. Even for a persistent queue, RMQ will start to chug RAM for any message that is delivered but not yet ACKed, for example, making it dangerous for apps that want to batch large groups of messages for efficiency. (It seems to me that it was not designed for that at all, but for one-by-one consumption, which is of course much slower.)
I'm looking for a mature distributed log that is clustered and lightweight. Kafka except, say, written in Go.
Note that NATS on it's own is just a pub/sub system and doesn't have any queuing or persistence. There is no "log". You need the NATS Streaming server for that which implements persistence while communicating over NATS.
Liftbridge is an experiment and has no real production users. NATS Streaming is already fine for lightweight single-host usage. If you really need to scale to multiple servers then I recommend skipping NATS Streaming and going straight to Kafka or Apache Pulsar.
Other than that, there are embedded queuing/log libraries or you can just dump the messages into a key/value store or write them out to a file yourself.
Hey, is there a tracker somewhere for this? I've always been into trying Kafka, but I heard horror stories about ZK so having it removed would be nice.
> This works well, but the drawback is that a service has to write to two separate storage systems, a database and Kafka. We still need check-and-repair.
The first paragraph on that page covers that at a high level.
> Waltz is similar to existing log systems like Kafka in that it accepts/persists/propagates transaction data produced/consumed by many services. However, unlike other systems, Waltz provides a machinery that facilitates a serializable consistency in distributed applications. It detects conflicting transactions before they are committed to the log.
The core concept of all deterministic databases is simple: if your database is deterministic, multiple geographically-distributed replicas can execute the same transaction log independently, and they will all reach the same state. The problem of implementing a distributed database is reduced to implementing a distributed log.
However, there is one oddity of the Waltz design: the central locking mechanism. Other deterministic databases don't have anything like this, because it's not necessary. You can keep track of locks locally on each replica, and you can rely on every replica to reach the same conclusions about which transactions succeeded and failed, because they are deterministic.
Can anyone clarify why they are managing centralized locks?