Jay Kreps, CEO and Co-Founder of Confluent discusses ksqlDB which is a SQL engine for data in Apache Kafka. Jay talks about stream processing, Kafka and how the data can now be queried with push/pull queries with ksqlDB, similar to a relational database. Jay discusses some of the similarities and differences between SQL databases and ksqlDB and outlines how ksqlDB simplifies building real time applications over streaming data with a simple query model. Jay talks about its capabilities with examples of how it can be used to build real time applications, its internals and how it fits into the Kafka ecosystem.
Show Notes
- Episode 393 – Jay Kreps on Enterprise Integration with a Kafka Event Log
- Apache Kafka with Jun Rao
- Apache Kafka
- ksqlDB
- ksqlDB Architecture & Concepts
Transcript
Transcript brought to you by IEEE Software. This transcript was automatically generated. To suggest improvements in the text, please contact [email protected].
Transcript brought to you by IEEE Software magazine and IEEE Computer Society.
This transcript was automatically generated. To suggest improvements in the text, please contact [email protected] and include the episode number and URL.
Akshay Manchale 00:00:55 For Software Engineering Radio, this is Akshay Manchale. Today we have Jake Kreps and we’re going to talk about ksqlDB. Jay is the CEO and co-founder of Confluent. Prior to Confluent, he was the lead architect for data and infrastructure at LinkedIn. He’s the initial developer of several open- source projects, including Apache Kafka. Jay was previously a guest on SE Radio on Episode 162, and more recently on Episode 393. Jay, welcome back to the show.
Jay Kreps 00:01:20 Thanks so much for having me.
Akshay Manchale 00:01:21 So before we get into ksqlDB, let’s talk about a few basics. Let’s start with Kafka. What is Kafka?
Jay Kreps 00:01:28 Yeah, that’s a good question. So we call it an event streaming platform, but that kind of begs the question of what the heck is event streaming? And so, it really is kind of a new category of thing that’s come around. Some people compare it to being a bit like a kind of message queue that you send messages to, and it kind of cues them up and sends them out. But it’s a bit more than that. So the idea is, Kafka acts as a kind of distributed cluster, and you can read and write streams of events. And by events, I mean, you say something happened and it gets recorded as a kind of linear stream, almost like an array of these events. It just keeps growing and growing, and the writes all go to the end of the array, and readers can read along in that stream.
Jay Kreps 00:02:15 And so, those events could be anything. They could be things happening in business. It could be sales that are occurring, messages of all kinds, so this is a very new thing. Kafka itself has become quite popular now, so it’s out in the world really at large scale. We’re doing our Kafka summit this year. And looks like we’ll have over 25,000 people who are signed up and going to attend that virtually. So, it’s really become a popular layer for building this kind of asynchronous event driven microservices that communicate through these events. It’s become popular for building real time low latency pipelines for data that stream things into different data systems or SaaS layers or APIs and it’s become, kind of part of the modern stack in a lot of ways.
Jay Kreps 00:03:12 And so that’s kind of the basic concept. You can interact with Kafka through a couple different APIs. There’s these lower-level APIs where you would produce or write data into one of these streams. There’s a consumer API where you would read or subscribe to data that’s being written. And then, there’s APIs which allows you to use prebuilt connectors that hook up and maybe capture changes occurring in a database or publish changes out into some data store or SaaS application. And there’s hundreds of these pre-built connectors for different types of system and applications. You can kind of hook them up and turn their data into streams or take streams in Kafka and send it off to other systems. And then there’s a stream processing APIs in Kafka, which allows you to take these streams of data and react or respond or process them as they occur. So you could imagine maybe you have a stream of the sales that are occurring in some e-commerce application, and you can imagine triggering order fulfillment or counting how many sales you had by geographical region as those sales occurred. And so those are the different ways of interacting with these streams of data. And so that’s kind of the 32nd in a nutshell explanation of what Kafka is.
Akshay Manchale 00:04:32 Great. So I keep hearing things like Kafka topics. So what’s a Kafka topic?
Jay Kreps 00:04:37 Yeah. So topic is kind of a terminology we borrowed from the messaging world. Itís really just the name for one of these data streams. So maybe I would have, in this e-commerce application, I might have a sales topic, and there might be many ways that a sale occurs, right? Maybe it occurs on an iPhone app, or it occurs through some backend system, or it occurs, I don’t know. And you can imagine each of those sales being published out to this topic, which means really just appended to that kind of infinitely growing array that I described. And that might trigger all kinds of activity on the backend. So there might be a system that does order fulfillment, and every time a new sale is published, it would react and whatever is needed to send the information off to a warehouse and get that thing into a box and get it sent off that it would be responsible for that. Maybe there’s something that updates the information for customers. Maybe there’s something that updates the kind of coupons and whatever and sets up the returns whatever it is that you would do. Maybe there’s analytical systems that provide reporting that would need to update. They would also subscribe to that feed of sales and react to it. And, and so the name for that feed or stream is a topic in Kafka terminology.
Akshay Manchale 00:05:55 Interesting. So what’s different about Kafka compared to a message broker that moves data from a publisher to a subscriber?
Jay Kreps 00:06:02 Yeah that’s a great question. So when we were building in this area that there were a whole set of different kind of cues and messaging layers that the kind of foundational technology differences in Kafka that make it a little different. I’ll describe, and then I’ll describe kind of like from the user’s point of view, what do you get that’s different. So as a technology layer, what’s really different is Kafka is storing persistent, replicated events or messages, right? And it can store them really for any amount of time, and you can hold lots of them. And that means it, everything is kind of multi subscriber. Like any of these topics, it can have zero subscribers or consumers, it could have dozens, hundreds of subscribers or consumers. And so this idea of really orienting around events being always multi subscriber, being persistent, and then building around kind of a modern distributed system that allows this to scale out horizontally and do so more elastically and dynamically.
Jay Kreps 00:07:07 All of that’s, that’s fairly different from a traditional message queue, which is more oriented around individual servers that would hold messages where they would put them on disk only if needed. And beyond that, Kafka doesn’t really provide a queue abstraction when you read a message, it doesn’t pop it off the queue. Instead, it has this kind of stream. And it turns out that this is those qualities end up really important if you’re trying to build a reliable data pipeline that guarantees the delivery, it turns out it’s really important if you’re trying to hook up multiple applications or systems within a company and guarantee that they all get it, it’s operationally very important just because you don’t want something where if the memory fills up and it goes to this, it suddenly gets 10 times slower and tips over.
Jay Kreps 00:07:54 And for all those reasons, I think Kafka has ended up being a much better platform for a large-scale usage within a company. And then it also turns out and this was part of our motivation, that this abstraction is a much better foundation for stream processing, for working with this data in real time. So if you think about it, the message queues, it’s a pretty low-level abstraction. You can kind of put a message in the queue, you can take it out but everything beyond that is kind of up to you to go figure out in your code. And a lot of the things we do with data they’re actually are common patterns, and that’s why databases they don’t just have put and get, they have also a bunch of functionality that allows you to compute things off of the data.
Jay Kreps 00:08:37 And that’s, I’m sure we’ll get into that when we talk about ksqlDB and stream processing. It’s very, very hard to build that on top of a traditional queue. And Kafka was really designed with that in mind. Actually, we started with the idea of–hey, it would make sense to do a lot more real time processing on data. How would we do that? And then we realized, well, the first thing you need, if you want stream processing is you need streams and you need them at scale across the company. And that was actually what motivated a lot of the early Kafka development.
Akshay Manchale 00:09:05 With that information, let’s jump into ksqlDB, which is our main topic. So what is ksqlDB?
Jay Kreps 00:09:12 Yeah, that’s a great question. So I started to touch on stream processing and it’s probably worth just describing what I mean by that. And what the difference is from batch processing or more traditional databases. So, I’ll explain it by analogy. The way databases typically work is they have kind of a pile of data. Whenever you run a query, they actually need the data to be static and unchanging. And the query is just kind of at a point in time. And so database would usually lock that table or have some kind of concurrency mechanism to get a view on that. That doesn’t change. It would scan over it and compute the results of your query, and it would give it back. And so the assumption is that query is just a very transient thing, and it gives you an answer at a point in time.
Jay Kreps 00:09:59 Of course, that that answer is potentially immediately out of date, it doesn’t get updated. But that actually makes sense. If you’re building a UI driven application, you kind of want to know the answer. As of right now, you’re not really, you don’t really care what the answer is in the future in a lot of cases, although even UI is changing a lot as we have these more dynamic UIs that get push updates sent to them. And so, that’s the traditional model for databases. And so the analogy I think that makes it easiest to understand stream processing is in the US they have a census that they do every 10 years. And the way the census works is they send these people out with binders and they go door to door and they record everybody’s name and the address, and that’s their way of counting how many people there are.
Jay Kreps 00:10:45 And you can think of that as a kind of batch process, right? It’s scanning all the data and it’s saying, okay, here’s a person, here’s another person, here’s another person. And then when you want to get an update on that, you have to kind of go back and you got to scan all the data again. And so if you explain that to a software engineer, they’re like–hey, that’s kind of funny. I don’t know if I would build it that way. Probably what I would do is I would record all the births and the deaths, and if you had that stream of births and deaths, you could keep kind of a running count of how many people there were, right? And that’s exactly what stream processing is. So the births and death, those are events, right? Those are big events in people’s lives.
Jay Kreps 00:11:25 You could imagine having attached that, all the information about the person who was born or who died, right? So everything you would want to know in a census there, gender, ethnicity, whatever. And then you could imagine keeping a kind of running count on top of that that would say, okay now in Mountain View, California, we have 75,692 people. A baby is born now it’s 75,693, right? So, that’s the fundamental idea behind stream processing. And you, you can think of it another way, which is just, if you have this Kafka cluster, and it has all these Realtime streams of data you want to do things with that, you want to harness those, calculate things, take one stream of data, translate it into something else and stream processing is kind of how you do that. And so, Kafka has a set of lower-level APIs that help to do this.
Jay Kreps 00:12:12 They help you to do real time aggregation, a kind of continuous running count on data. They help you to do joins. So in our e-commerce example, maybe you have this stream of sales, but you also have information about the customer, and you need to put those together for fulfillment purposes. Okay, this sold, but where do I ship it? Well, I need the address, I got to join on the information about the customer. Maybe you’re just doing simple filtering, maybe you’re looking for the sales that are above a certain amount, all the kinds of things you might do in a database. So there’s kind of low-level capabilities for doing that in Kafka. And so what is ksqlDB? Well, that’s really our attempt to bring this out in a more database-like format. So in a database, you can run SQL queries and a lot of us know SQL.
Jay Kreps 00:12:59 It’s not a perfect language, but it’s an easy language. It gives a lot of the expressive activity of what people want to do with data. And you wouldn’t necessarily know if it was applicable to this kind of stream processing. But it turns out it is, it turns out it’s actually a great way of doing real time processing. And what we’re trying to do in ksqlDB is really, in some sense, bring together what we think are the two sides of databases. One of these sides is really well developed, which is if you have a pile of data, can you do lookups and queries on top of it? And in fact, that’s kind of the only thing we know in databases. But the other side is, if you have a flow of data, can you calculate and update things off of that?
Jay Kreps 00:13:42 And that’s always been a little underdeveloped in databases. They have some features like triggers or materialized views that are kind of very simplistic, but it’s not actually expressed with the full power of SQL on those streams. And so, an example of this would be in a data warehouse. Once you get all your data in the data warehouse, the data warehouse gives you all this wonderful quarry functionality to do aggregations and lookups and things on top of it. How you get the data in and how you get it into the right format is a little bit of a dark art, right? Like, there’s usually some hacked systems that shove things in, then you kind of load run some big batch processes on top of it. But you can imagine a different world, which is the data and the data warehouse is always up to date.
Jay Kreps 00:14:26 There’s queries that run on that data as it flows in and there’s queries that do lookups to run reports and so on. And that kind of illustrates, I think the two sides of this query dynamic, the changing data as it flows and looking up data as it sits. And in ksqlDB, those are represented with two concepts streams and tables and you can query both of them. And stream is this kind of continual set of changes as Kafka topic and tables are aggregates we’ve calculated, right? Or lookups, it’s something that’s keyed, right? So you might have a table that’s all your customers by some primary ID, and you might have a stream of all the sales that occurred. In a data warehouse they would call these facts and dimensions.
Jay Kreps 00:15:13 They have this idea of facts, which is basically kind of events, although you can’t really subscribe to them. And they have dimensions, which is the kind of store data about customers and products and so on. So those are the concepts. And then we support this by having the idea of push queries and an idea of pull queries. So the kind of traditional query where you go do a quick lookup that’s a pull query, you’re kind of pulling the data out, and the other kind of query is a push query. It runs as the data kind of pushes into the system. And by putting those together, it’s actually really powerful. You can imagine use cases where you’re kind of building some cache. Maybe you’re taking feeds of data out of different data systems and putting together some aggregate view of your customer or some pre materialized view of data that you need to serve some use case. And that, that view gets updated every time the underlying data changes, and then you can do lookups against it to get a particular row or value. Does that make sense?
Akshay Manchale 00:16:07 Yeah. So when you say a pull query, when I do the same thing on a traditional database, is that the same as a pull query when I say select star from table, is that what a pull query is in ksqlDB?
Jay Kreps 00:16:18 Yeah, that’s right. That’s right. And so, the difference now with this kind of streaming query is that you can now have subscriptions that continue to update as the underlying data changes. So, traditional query is kind of a point in time thing, like–hey, what’s the value right now? But you could imagine for systems that are actually subscribing, you need not just the point in time, but also the ability to get all the changes. And so you might say–hey, give me the count of the number of people in every city, but instead of just getting it right now and having it be out of date, you would get it and have it update as people moved between cities or as people were born or died. And that’s basically the idea of extending databases for stream processing for events and streams.
Akshay Manchale 00:17:04 So in that example, let’s say I have a dashboard that is displaying the running count of people. It’s just a single query that I write, and the changes are just automatically coming into the client with a push query. Is that sort of the idea there?
Jay Kreps 00:17:17 Yeah, that’s the basic idea, is that you can both get a point in time lookup as well as continually subscribe and calculate changes.
Akshay Manchale 00:17:26 So what’s a table in ksqlDB? Is that traditionally the same as what a relational database looks like? Does it have a schema and types and all of that?
Jay Kreps 00:17:35 Yeah, that’s right. So, the type system and the SQL is, very similar to what you would expect in a traditional database. It’s just that now when in addition to saying create table with the appropriate attributes, you can also say create stream, and you can think of a stream as being like a kind of table of immutable data that has a strong ordering that’s only appended to over time. Effectively it’s like a fact table in a data warehouse and a table in ksqlDB is very much like a table in Postgres. It’s something that typically has some kind of primary key and you would insert into it or change it or look up from it, or modify it in a continuous way. And the key thing that ksqlDB allows is for you to go back and forth from streams to tables.
Jay Kreps 00:18:21 So in that example I gave of e-commerce, you can imagine having this stream of all the sales that are occurring, you could compute off of that you could take that and you could take maybe also the stream of shipments of products that are happening. And using those two, you could compute what’s my inventory on hand. And so the stream of shipments, that’s like an event, right? This was shipped, this arrived, a stream of sales was like also events this sold, but inventory is actually a table. It’s like, for each product, how many of these do we have on hand in each location? And it’s going up and down as the shipments are occurring. And so, the cool thing about this is it allows you to go back and forth between this idea of streams and this idea of tables. And I think it’s actually a very powerful generalization of databases and how we work with data that really has a lot of applications. We see the use cases being this kind of streaming data pipelines, like Realtime pipelines where you’re capturing data from different systems and transforming it as well as like materializing these caches use cases around security where you’re looking for the bad patterns, but it’s actually quite applicable across a lot of different use cases and systems.
Akshay Manchale 00:19:33 So what is the source for ksqlDB data? Is it a consumer from a Kafka topic?
Jay Kreps 00:19:38 Yeah, ksqlDB always works on top of Kafka, but it actually allows you to control the Kafka connectors as well. So I mentioned Kafka has this Connect API and the Connect API is a way of building these simple plugins that either capture a stream of data from a system, so there’s plugins for, MySQL or Postgres or Oracle. There’s plugins for different messaging layers. There’s plugins for a lot of cloud data systems. There’s plugins for different SaaS APIs. So it can either capture a stream or it can take a stream that’s in Kafka and publish it out to some system. So you can imagine maybe you have a MySQL database and you’re capturing all the changes coming in from that database, and you’re publishing it out to Kafka, or you could imagine taking a stream from Kafka and loading it up into the MySQL database.
Jay Kreps 00:20:27 So, there’s connectors usually in both directions. We call them sources and syncs. And so, to make these kind of pipelines easy, ksqlDB also allows you to control those connectors. So you can actually have a pretty end-to-end flow for streaming data where you say, okay, connect to this database, capture all the changes, connect to this database, capture all the changes, take those streams, combine them in this way materialize this new table of data that is being computed off that, and now do lookups against that. And, so it is actually kind of an end-to-end solution in this event streaming space. Whereas, really before this I feel like people had to piece together a bunch of different things to get something, there was some stream processing which would do the transformation. There was Kafka, there was integrations with different systems, and then there’s some database you would do the lookups against. You had to kind of wire all that together and figure out how to secure it and so on. And so this makes it a little bit simpler in that you really just have Kafka and ksqlDB to work with, and they’re kind of built to work together and have the same, a similar set of abstractions and security features and so on.
Akshay Manchale 00:21:38 So when you are ingesting data into ksqlDB, can you have some sort of transformation from the source before you ingest it? Or is it a one is to one data with respect to what’s in a Kafka topic, that you can just query differently? What’s the difference there?
Jay Kreps 00:21:52 Yeah, typically people don’t do a ton of transformation as part of that load. You would typically do it afterwards in ksqlDB. But yeah we do support we call them single message transforms that allow you to do simplistic munging on a single row as it comes in, that that ends up being important. In some cases, there’s some sensitive data you don’t want to load, or you want to Optus skate, or you want to encrypt in a particular way, kind of on the way in without it ever being stored in the original form.
Akshay Manchale 00:22:22 So you can query quick ksqlDB, like SQL, but is there any difference between just regular SQL and ksqlDB?
Jay Kreps 00:22:29 Yeah, it’s the pull queries are certainly more limited. So, at the moment it’s limited to relatively simple lookups. We’re working on broadening that further over time, you can imagine our goal isn’t to really replace all databases with this. It really is for use cases around event streams where you would otherwise be gluing together multiple technologies. So if you have a UI driven app that’s using Postgres, we wouldn’t come in and say, oh, stop that and use this ksqlDB thing. It really isn’t the case where you’re trying to do things with streams of data around Kafka, where this can suddenly add a lot of simplicity. What would’ve been a bunch of custom code and integration between multiple systems can do that. So yeah, we are broadening the query features over time that do pull queries. But, but obviously if all you’re ever doing is pull queries, there’s 101 databases to do that. The magic here really is the push queries, the kind of stream processing side of the equation, which is a feature which really hasn’t existed elsewhere.
Akshay Manchale 00:24:10 Can you talk about the components that make up a ksqlDB system with respect to how it works in an existing Kafka deployment and how it fits in?
Jay Kreps 00:24:19 Yeah, absolutely. So, in a traditional database architecture, you have kind of commit log, and then you have these tables, that have your data represented in different ways. Effectively, this looks a lot like that, except that the commit log is now Kafka, and the tables are actually materialized in the ksqlDB cluster. So in ksqlDB, you would have a set of nodes, those nodes read and write streams of data from Kafka. They may materialize it into ROC CB which is not actually kind of a full database in the sense of having SQL and supporting remote access. It’s an embedded key value store that just maintains the data format on disk. And so that’s how the kind of indexes and tables are stored to allow lookups. And so that’s how it works.
Jay Kreps 00:25:07 And this makes ksqlDB to be kind of elastically scalable. You can add new notes and they will take over part of the work of query processing automatically. So you can kind of scale it out or scale it down elastically by adding or removing nodes from that cluster. And it works similar to Kafka itself, where there’s a kind of partitioned workload. So you can imagine taking one of these tables and chopping it up into little pieces and storing those pieces spread across the cluster. And you can do that with some replication so that if one of the nodes fails it doesn’t go away. But the kind of long-term store for all data is in Kafka itself, the ROC CB instance is really just kind of a cache. And so even if it’s replicated, it is just kind of a quick lookup index, not the kind of authoritative source, if that makes sense.
Akshay Manchale 00:26:00 So if you were to lose the instance of ROC CB, you could rebuild that data from your Kafka topics?
Jay Kreps 00:26:06 Yeah, that’s exactly right. So, and it’s common with stream processing. You may calculate a bunch of things and then want to recalculate them later, and you may indeed want to throw away all your data and kind of start over and reprocess from the beginning as a simplistic way of getting there. So thatís kind of the basic architecture. All the actual processing and computation happens in the ksqlDB nodes, Kafka is just being Kafka. It’s reading and writing the streams and replicating them and storing them. So it’s not the case that these queries are kind of going off to the Kafka cluster and doing anything like that. One of the things we felt was very important was making sure that Kafka itself works really well as a multi-tenant system that can be shared across many teams and use cases without tipping over.
Jay Kreps 00:26:53 Whereas ksqlDB you can do all kinds of complicated munging and you can have many ksqlDB clusters that feed off the same Kafka cluster. They can all share data, but they don’t interfere with each other. If you do some kind of very abusive query processing, in your ksqlDB instances, it won’t hurt your coworkers. And I think that’s an important property for any of these things that go across teams is to think through how that multi-tenancy architecture is meant to work. And so for us, it’s Kafka’s the shared part, and it does very simple predictable things. ksqlDB does all the complicated stuff but is meant to be clusters that are not really shared across teams and meant to be multi-tenant. Instead, you would give each tenant their own capacity and it’s actually a very flexible model. Typically in databases, if I give you your own database, you don’t have access to my data, but because of this shared commit log at the center, everybody has access to the same stuff. It’s just a question of how you index it and process it in different clusters.
Akshay Manchale 00:27:56 Speaking of indexes, relational databases, then to have certain indexes, can you create additional indexes on top of your massaged data that you’re consuming from Kafka topics and ingesting into ksqlDB? Can you have indexes on top of that for faster lookups and all of that?
Jay Kreps 00:28:12 Yeah. So we don’t support kind of secondary indexes yet, but you can of course just materialize the data in different ways, which effectively serves the same purpose and is maybe more in the style of this kind of stream processing. We’ll probably add secondary indexes just to make the pull quarries simpler in certain cases and have less materialized data, but there hasn’t been a big push for it yet.
Akshay Manchale 00:28:33 So, ksqlDB sort of simplifies how you can access the data case streams or the stream API on top of Kafka lets you do the same thing. So what’s the correspondence between the two? Can you do everything that you can do with streams with ksql and vice versa?
Jay Kreps 00:28:49 Yeah I would say you can do the vast majority, obviously with an open API that supports code you can do more than, than you could do in SQL. So the relationship is this, actually the streams API is a lower level set of kind of Java primitives for doing stream processing operations, joins, aggregations, filtering, all the things you might imagine doing with data. It looks like one of these kind of, fluent APIs where you change together a bunch of operations, one after the other. If you’ve used Java, the kind of streams interface and Java is similar to this, where you would, change maps and reduce aggregates and so it works like that, except it does this on these infinite streams, not just finite collections and does it in a persistent and partitioned and distributed and in fault tolerant way.
Jay Kreps 00:29:42 So that’s what the streams API and Kafka provides. KsqlDB is actually built on top of that. So it uses those primitives under the covers, but it does do more than that. So it provides a SQL layer on top of that. It also provides remote access so that you can run these kind of pull queries. You can send new queries to it. And then it also controls the Connect API in Kafka, so you can run connectors and it brings both of those together. And so we feel like that really broadens the appeal. Maybe Java has what, whatever you want to call it, 40% of the programmer market, but kind of everybody knows SQL and it’s also just a lot less work to put together a SQL query than it is to kind of build and test a full Java application.
Jay Kreps 00:30:28 So it’s not really depending on the use case, it’s not that one exactly replaces the other for building kind of a very complicated custom set of business logic, there’s a good chance you’re still going to do that in Java, but for a lot of simple data transformations, materializing caches of data looking for the bad patterns for security, that’s the kind of thing where you just want to write the query, just say what you mean, not write a ton of code and compile it and deploy it, and all that kind of stuff. And that’s really where ksqlDB shines is those kind of simple streaming use cases.
Akshay Manchale 00:31:02 So there seems to be like a trade-off between ease of use versus expressiveness of what you want to do for expressivity, just use like case streams and otherwise use ksql.
Jay Kreps 00:31:13 I think that’s exactly right. You could think of there being kind of a hierarchy here where the lowest level producer and consumer APIs that write streams, they kind of have all the power, in Kafka, they have the transactional processing. In theory, if you’re willing to write enough code, you can do everything with that. But that’s a lot of code in most cases. And you would end up rewriting the same stuff over and over again. And that’s where these, the kind of connector API and streams API provide the next layer up in that stack. It’s a little bit higher of an abstraction. It makes it easier to get correct results and reason about what happens if the machine fails in the middle of processing something. But you do trade off a little bit of flexibility in how you write that versus the low level read and write. And then one level up from that I think is ksqlDB. So the analogy you could use is, if you’ve ever used one of these key value interfaces like ROC CB itself, it’s kind of very flexible and allowing you to work with data at a low level, probably more so than a SQL interface, but it’s actually a lot more work for kind of simple stuff that you might want to do than using a SQL database.
Akshay Manchale 00:32:19 Is there a notion of transactions within ksqlDB? Can I ingest a particular set of events? Only if a certain condition is satisfied? Is there, an all or nothing sort of a guarantee? And traditional what relational databases have?
Jay Kreps 00:32:33 Yeah, there is, although it’s a little bit different. So, since the domain is stream processing in, and the pull core are mostly for looking up results, it’s really about how can you guarantee the correctness of that processing. So, and you can imagine a lot of corner cases here. So, what if a message was delivered and processed, and then the stream processing application died and then it kind of came back and it gets that message again? What happens? So in my census example where I was kind of counting births and deaths, you don’t want to double count a birth right and end up with the wrong number of people in a given city. And there’s a lot of corner cases like that, that you could imagine.
Jay Kreps 00:33:18 And it’s very similar actually to the domain of transactions in normal databases and the underlying Kafka APIs support transactional concept. And so it works just like you would imagine a transaction. You say begin, you do a bunch of writes, and then you say commit and, then all of those writes either happen together or they don’t, they don’t happen at all. And so the stream processing functionality in case SQL, to be uses that, so it says, okay I’m working across these different topics and so on. I need to make sure that whatever happens, network glitches, one of the nodes fails, I still get the correct output. In this case correct output means the same output I would’ve gotten if nothing had gone wrong. Right? And people often refer to this as exactly once semantics, meaning you get the same semantics as if the message had been delivered just one time.
Jay Kreps 00:34:13 Even though, of course under the hood you may send some message over network, you don’t know if it got there, the thing fails, it comes back, it gets it again under the hood many things are happening. You want to get the same output you would as if everything worked perfectly. And so that’s supported within ksqlDB and it’s an important thing to be able to have that there, there have been a whole history of stream processing layers over the last five plus years and they’ve kind of built up functionality that makes it easier and easier to use and so on. A lot of the early ones didn’t really have any real guarantees in this area, or they had very weak guarantees. Of course, that makes it very, very hard to write any kind of important application on top of it.
Jay Kreps 00:34:57 If you can’t guarantee that you get the right output, then it’s only really usable for things where the answer doesn’t matter. And so that kind of guarantee is very important. It is end to end because you have to reason about what’s in Kafka, ksqlDB’s part, how it gets out into the other systems that it might ultimately end up in as a destination. And so that is another part of the simplicity I think you can bring in this area by having a bunch of components that are built to work together
Akshay Manchale 00:35:24 On the query side of things, can you join different tables, different Kafka topics to get a result that you want?
Jay Kreps 00:35:32 Yeah, absolutely. And the use case for that in stream processing, it isn’t always obvious to people, but there’s actually quite a lot of that. So one of the examples I gave was, you have this stream of sales, but the sale in it, it probably just has your like customer ID, right? But a lot of what you would need to do to process a sale in different ways, you probably need a bunch of other information about the customer. Maybe you need their primary shipping address or whatever the case may be, or you probably need to join on that information. And so you could imagine the sale as being a stream of events. You could imagine the customer information is being a kind of table, you’re joining that stream and table together. And so it supports all the combinations.
Jay Kreps 00:36:17 Here you can join a stream to a stream. So in advertising, you have clicks and you have impressions. And one of the problems is often trying to say–hey, which impression, which viewing of the ad led to the click on the ad, that’s a kind of stream, join a stream to a stream, right? In normal databases, of course you join tables, there’s a lot of that. You might have streams coming out of different databases where you’re kind of capturing the feed of changes and you might want to aggregate all that. So a common example of this is where you have bits of data about your customer spread over many different systems in a company. Any big company ends up with this problem where you’re like–hey, we know a lot about our customers, but you have to go to 27 systems to figure out the answer.
Jay Kreps 00:37:00 And that’s an example where if you treat the changes coming into each of those records in each of those systems as a kind of change log, then with Kafka Connect, it can kind of extract that from those systems and you have this feed and then using ksqlDB, you can effectively produce this kind of streaming join that gives you the end all be all record for each customer that has all the information together. And so that’s an example of a normal kind of table join table to table join. So all these combinations of streams and tables are actually pretty useful in this domain. Once you start to think about it.
Akshay Manchale 00:37:37 You mentioned earlier that it’s possible to have multiple ksqlDB instances maintaining their own table or correction that can come and go. So from a client perspective. Can I join on data that’s on two different ksqlDB nodes? Is there a distributed query of sorts or is it isolated to a single source?
Jay Kreps 00:37:55 Yeah, yeah, so that’s true both in two different ways. So, a ksqlDB cluster is made out of multiple nodes, and then you can have many clusters working off of the same Kafka cluster. And so, I was actually saying both things. So, I could have my cluster, you could have your cluster, and then within my cluster, it’s actually spread over multiple nodes. So the data itself is partitioned up, and of course, you would need to be able to join across different partitions. The actual join itself is performed locally, so it would reshuffle the data to get it into the same node to do the join. It’s not doing like a remote RPC call for each lookup, so under the hood it would be done locally. But yeah, you can join different topics together in different ways.
Akshay Manchale 00:38:40 So since you’re using ROC DB as a backing storage layer, can you use ROC DB in conjunction with ksqlDB as a traditional relational database, create a classic table on ROC DB, and then use the two things to sort of query, ingest, et cetera?
Jay Kreps 00:38:56 Yeah, I mean, ROC DB itself, the DB in the name maybe oversells what it is to people, it is just kind of a put get scan interface in C and Java. So it doesn’t support any SQL or anything like that. So that the table concept is of course, using that under the hood to store these partitions of data in ksqlDB and you can combine the tables and streams in ksqlDB, but there’s not really a SQL interface to you directly access the ROC DB part other than the ksqlDB interface itself. But I think probably the use case you would want to use that for you actually can do in ksqlDB itself.
Akshay Manchale 00:39:33 So you did mention that ksqlDB has a schema of sorts where you can declare columns and a primary key and you are consuming from Kafka topics. So how do you deal with changes in schema from your upstream publishers of data?
Jay Kreps 00:39:51 Yeah, that’s a great question. So yeah, I guess like any SQL layer, ksqlDB needs a notion of what the data is and that lets it get into the records among them in a smart way. So you can kind of express that just yourself by saying–hey, I assert this is what’s in this data feed. Maybe have Json records and you’re saying, I’m saying it’s going to have these four fields, and that’s the scheme of my table. Obviously, if somebody upstream publishes mangled Json, then you got a problem. So it does also support usage with what we would call a schema registry. So Confluent produces something that maintains these schemas along with Kafka, and that’s a way, it’s actually a very important component for the usage of Kafka.
Jay Kreps 00:40:34 I talked about the usage of Kafka across different teams. Obviously that only works if we all understand each other’s bytes, which means we have to agree on the format. And so this allows you to maintain the format of data in common formats. So, protocol buffers or Avro or Json, it’ll store a schema for that. It’ll check the data against that. And then you can express different compatibility rules. So you may have some Kafka topics where you say– hey, it’s wild west. Put whatever you want in, and good luck to whoever’s downstream. The reality is for important data, if you have a lot of applications building around it, you don’t want to be in a situation where the person putting data in or the many teams putting people data in have to go talk to all the teams downstream.
Jay Kreps 00:41:18 It just gets really hard to organizationally maintain correctness in that world. So, typically in those situations, you would enforce some notion of forwards or backwards compatibility which, control what you’re allowed to do, right? So, the most restrictive would be you can’t change it at all. The reality is in most businesses, the problem changes over time. The software needs to evolve, so that’s usually too restrictive. So, but you usually do want to have some notion of compatibility around, you can add fields, but you can’t change the type on an existing field because that’s going to break downstream code. It’s going to break ksqlDB whatever is relying on the format of that. So you would want to use that schema registry to enforce these requirements. And that’s actually quite common. This is a very popular open component that we produce that is very commonly used with Kafka itself.
Akshay Manchale 00:42:13 Since you have a cluster of ksqlDB nodes, I can presumably write a query in different ways. If I go to a relational database and write a query, there’s ways to sort of understand what the cost of the query is, plan and kind of get some feedback about it could be written in a different way for facet access. So in the presence of this cluster of ksqlDB nodes that are maintaining their own tables and views, is there some assisting for query planning optimization and all of that?
Jay Kreps 00:42:41 Yeah, we have a little bit of functionality to let you understand what’s going on under the hood. It’s not as mature, I think, as the more mature databases and what they provide some of that is because the pull queries themselves are actually simpler in terms of what we support. So there’s less to debug there. What you actually need in addition to that is the ability to really change and evolve queries in a way you wouldn’t in a relational database, right? Because if you have a query that just happens for a point in time and then is done, then you don’t really need to evolve it in place. But with stream processing, you have these queries that might run forever, right? So if I’m computing, how many sales happened in each city, at some point, my logic for how I count that or how I define cities might change. And so in addition to the kind of explain plan stuff, what you also need is the ability to go back and reprocess data. And that’s something we’ve put some thought into. There’s kind of ongoing work to help with that kind of query evolution, make it easier to do that in place without having to go back and reprocess data. So that’s kind of an interesting nuance as well.
Akshay Manchale 00:43:48 And when it comes to actually accessing your data, since you have a cluster, is there a way to find the table, or do you need to know exactly where that nowadays it’s IP address or a DNS lookup name or whatever?
Jay Kreps 00:44:00 Yeah, the interaction with ksqlDB is very similar to most of the relational databases. So if you’ve used MySQL or Postgres or Oracle, there’s a little command line shell that you can hook up with. There’s a Rest API, but on the command line shell, you can do exactly what you would expect, which is, show me the table, show me the streams, I want to do this. You can kind of interactively develop these queries, see what the results of different things would be, run these queries. You can manage some of the background queries, like kill the long running ones. So, it’s very similar to those systems than what it provides.
Akshay Manchale 00:44:35 Do you have any comments on performance of ksqlDB and how quickly you can ingest things, in comparison with, say, a relational database? What’s stopping or why use ksqlDB or say a relational database that could be ingesting the same amount of data every event that comes in?
Jay Kreps 00:44:52 Yeah, that’s a great question. So, if what you’re building is ultimately a UI driven app where there’s a pile of data that kind of sits there and you just do lookups to show the result, then I think that would then, that’s effectively what a relational database is built to do. And they do it pretty well, and there’s different, there’s 101 databases, there’s probably one that’s a good fit for whatever the application you’re building is. And I don’t know that there’s really a motivation to adopt anything new, this other side of things around stream processing. That’s something that databases don’t do at all. And so you end up pushed into this world where you’re like in the data warehouse world, you dump a bunch of data in and then at the end of the day, you run some big batch computation to get it into shape, so everything’s like a day old.
Jay Kreps 00:45:37 And that’s nowhere, right, in the modern world, that’s where stream processing shine. So I would say the goal of adopting this is not so much that Postgres wasn’t fast enough. It’s actually that the application you’re building is actually a stream processing application. You’re reacting to things as they occur, and there’s a lot of use cases that just require that kind of access pattern. So then, how does the performance compare? Yeah, it’s hard to compare because they don’t, these other systems don’t do stream processing. So, the ksqlDB just does relatively simple lookups. So, the typical, like if you run it through one of these data warehouse, query benchmarks, like it couldn’t do most of quarries in that, it just does simple lookups right on the pull quarry side.
Jay Kreps 00:46:22 So the pull quarries are fast, but not nearly as expressive yet. That’s an area we’re adding. And then of course, the stream processing doesn’t exist at all in those other systems. So, it’s probably hard to do apples to apples comparison in terms of raw performance of how, what can you use this on, there’s kind of a poor node number, which is actually quite good. You can process hundreds of thousands of records per second on one node. But like a lot of these modern systems, it’s a proper distributed system and it scales horizontally. So you could add, if you want more, then you just add more machines and you can do that dynamically as it runs. So, as the query is processing, you can add more capacity to make it go faster. So, we’ve run on workloads that are actually massive in scale, where it’s millions of records per second.
Akshay Manchale 00:47:12 That’s great. So in Kafka, you could have older events sort of expiring or falling of the topic. So how does that flow back into ksqlDB? Let’s say I have some expiration that says, just forget about events from 30 days ago for compliance or whatever. How does that flow into your ksqlDB instance in terms of the aggregates you’re computing or the transformations that you’ve done on top of the underlying data?
Jay Kreps 00:47:38 Yeah, you’re saying maybe we can start that one over. I’m not sure if I understood the question. So you’re saying like the compliance of the data, the retention,
Akshay Manchale 00:47:45 So retention, let’s say retention of data and topics, you can configure that, right? So, since you have these push queries or pull queries, do you actually see data twice when things sort of are deleted because of retention policies?
Jay Kreps 00:47:59 Yeah, that’s a great question. So, in a stream you would have this idea of maintaining data just for some period of time. In theory, that period of time could be forever, but if that’s the case, you’re going to need more storage over time as more data accumulates. In a table, it works just like a database does. So the table data is stored forever. So if you have your customer accounts, the assumption is you wouldn’t ever time out your customer account that would stay forever. Under the hood, we do actually support kind of a time model table. That’s important for some of the windowing concepts, as you compute aggregates over a window, like how many people were born in last however many days. But at least in the common case, you could think of tables as being just like a database table where it persists until you delete it.
Akshay Manchale 00:48:50 So internally, was there any reason you chose ROC DB? What was the motivation behind that?
Jay Kreps 00:48:56 Yeah. Effectively we’re using just kind of a, some key persistent key value interface. And so it doesn’t particularly matter what that key value interface is. And so there’s a set of these different embeddable key value interfaces that run as libraries. It would be overkilled to have something that had like an SQL layer of its own or a bunch of advanced database features. It literally is just a library that we use for maintaining data on disk and all the distributed systems replication of data, SQL processing, that’s all done by ksqlDB itself. So yeah. Then when you look at different libraries, they all have pluses and minuses. Technically it’s actually pluggable within Kafka streams, which is used for these underlying primitives. And so you can actually plug in anything you want. There’s an all in memory version of it, that could have some advantages. There’s other systems out there you could adopt, but most people use Roxy because it’s, there’s pros and cons. It’s I think extremely featureful and very high performance. The con is, it has about a million tuning knobs. So there’s, if you don’t like the performance, you can almost certainly fix it if you can discover the right tuning.
Akshay Manchale 00:50:16 So operationally, how is this packaged along with Kafka? Do I need to manage a separate ROC DB instance unit and all of that? Or is it just out of the box that’s available?
Jay Kreps 00:50:26 No, no. So, I think you guys did a podcast on CockroachDB. They use it as well, right? So, for both of us the word DB and ROCs DB, is maybe a misnomer. It is actually just a library and it accesses the local disc. And so it’s really just a library that maintains a certain file format on disc. Itís not something you would install separately. It’s not something that’s accessed remotely over a network. It’s not something that has a SQL interface. So, anything which accesses data on disc you have to pay a little more attention to. But yeah, there’s no operational component of it. Effectively, you start this ksqlDB process and it has everything it needs what you have to tell ksqlDB is where’s Kafka? And that’s really the things you have to have is Kafka in ksql.
Akshay Manchale 00:51:21 Are there any anti-patterns of using ksqlDB for stream processing, such as building a push-based application, for example, using Kafka stream API versus ksqlDB? Are there anti-patterns of using one or the other where it’s not compatible?
Jay Kreps 00:51:37 Yeah, probably the biggest anti-pattern in stream processing I think is trying to do remote lookups for every record. And so this is kind of a very natural thing in software where, if you have one thing and you want to look up the corresponding record, you do some remote call to get that corresponding record. So in the example I gave, I had the sale, I want to look up the customer record. There’s a tendency for engineers to want to take each thing and do a remote call on the other thing. And it usually just isn’t the easiest, most correct or most performant way to do that. Because you have to reason about, well, what happens if that call fails and how do I retry it and all this stuff. So the more natural way of doing that in ksqlDB would be to capture the stream of changes on the customer data and capture the stream of sales, and then do that join within ksql rather than trying to do it in the application space itself.
Jay Kreps 00:52:38 That will kind of have the right properties around reasoning around time. It’ll have the right fault tolerance properties and it’ll be a lot faster because you’re not kind of doing a remote call on each item with all the kind of latency and whatever that implies. So that’s probably the biggest anti-pattern that people miss, which is just a little different from how people use traditional databases where you’re often joining, you’re doing joins kind of in your code over the network between different things and it works better because you’re typically just doing that for one or two rows, whereas in stream processing there’s a whole feed of these. And especially when you go to rerun your processing, if you change your logic, you want it to work quickly so you can get back caught up to the current time.
Akshay Manchale 00:53:19 So what’s the future of ksqlDB? Where is it going?
Jay Kreps 00:53:22 Yeah, well our goal is to really make working with streams of events as easy as possible. And that’s really our vision at Confluent kind of end to end. And so there’s a lot of work to try and make that easy. Like we’re building all this as a cloud service to try and make the operations side of it go away. It’s making all the features available that just make it really easy to work with this and make it easy to work across a large company. So in ksqlDB, the parts of that that are important, I mentioned just completing the set of functionality that people want in pull queries, that’s a ton of work, because it’s like all the stuff databases do. So we got a long roadmap of things to fill out that side of things.
Jay Kreps 00:54:04 We’re still not trying to replace traditional databases for what they’re good at, like building kind of end-to-end UI driven apps, but we want to make these event stream processing architectures really simple and make the access to that data really easy in an integrated system. There’s work going on around making it really easy to test and evolve these queries, the kind of full life cycle of development for it. And then a ton of work on just the full completeness of all the processing capabilities, performance, all the operational side of it. So lots going on. I guess whenever you’re trying to build a database, it’s a lot of work. So, we’re not running out of things to do.
Akshay Manchale 00:54:43 So is ksqlDB open source?
Jay Kreps 00:54:45 Yeah, it’s available under the community license that Confluent uses. And so it’s not an OSI open-source license, but you can take it, you can modify the code, you don’t have to pay us anything, you can make changes to it and publish those. The only major restriction is around, how that can be run as part of, kind of managed SaaS Cloud service which, that right is reserved. All the details are in the license itself, but that’s the same license that we produce are schema registry and other components under, and it’s very popular and gets a lot of free usage and of course you can do exactly what you’d expect, which is go look at it all in GitHub and make your own fork of that.
Akshay Manchale 00:55:25 Awesome. So is there anything else that you want to add about ksqlDB?
Jay Kreps 00:55:30 No. I think it’s, if you’re interested in databases and data systems, I think this world of events and event streaming is really becoming a big deal. It’s kind of becoming part of the modern stack and I think it’s a great tool to make working with this really easy and productive. So, I’d urge people to go check it out and give it a spin. You can try it out pretty easily and there’s a pretty active community around it, so if you kick the tires and there’s something that doesn’t make sense or doesn’t work the way you expect, let us know. There’s a set of people contributing patches and a bunch of people at Confluent working on it actively. So we’d love to hear feedback from people.
Akshay Manchale 00:56:05 I’ll include some notes in the show notes on how people can try ksqlDB. With that Jay, thanks for being here to talk about ksqlDB. This is Akshay Manchale for Software Engineering Radio. Thank you for listening.
Jay Kreps 00:56:17 Thanks so much for having me.
[End of Audio]
SE Radio theme music: “Broken Reality” by Kevin MacLeod (incompetech.com)
Licensed under Creative Commons: By Attribution 3.0
Very well done show. A lot of things were well explained for a beginner right from difference between traditional message brokers and kafka, to stream processing, indexing in kafka, use cases where you would use traditional sql vs ksql, performance impacts and nice touch with anti patterms. Good use of my hour!!