Mastering the Bluesky Firehose with Python: A Simple Tutorial
![Mastering the Bluesky Firehose with Python: A Simple Tutorial](/content/images/size/w1200/2024/12/follow-1hour-1.png)
Bluesky is awesome for many reasons, however one feature which not many may know about is the ability to collect data using the publicly available API thanks to the AT protocol - the system used to run Bluesky. One feature of the API's the ability to stream activity in real-time using the firehose. The service essentially allowed you to tap into the streaming network and collect posts which mention a particular keyword, hashtag or from a set of users.
What to see the firehose at work for yourself? Check out https://firesky.tv/.
In this post, we'll cover the basics, learning how to connect to the Bluesky firehose using Python so you too can stream data in real-time.
Getting Started
Before we get started with using the firehose, there's some AT protocol jargon worth explaining:
- Repo: A user's personal data repository in the AT Protocol, storing posts, likes, follows, and other activities.
- Commit: A recorded change or update to a user's repo, similar to a version control checkpoint.
- CAR (Content Addressable Record): A file format used to package and transfer data efficiently within the AT Protocol ecosystem.
- Block: A unit of data or record stored in a repo, often referenced by its unique identifier.
- Lexicon: The schema or data modelling system defining types, methods, and validation rules in the AT Protocol.
With that cleared up, connecting to Bluesky's API service requires using the "work-in-progress" atproto
Python package. This can be installed using pip install atproto
. If you're really tech-savvy, check out Bluesky's goat
CLI tool.
The Code
Let's start with some imports. Hopefully this will make more sense shortly
from atproto import FirehoseSubscribeReposClient, parse_subscribe_repos_message, models, CAR
...
Next, we need to create a client to subscribe to the Firehose repo with the FirehoseSubscribeReposClient
...
client = FirehoseSubscribeReposClient()
...
With our client, we'll need to register a message handler to managed incoming data from the Firehose. Here's an example:
...
def on_message_handler(message):
...
client.start(on_message_handler)
Within on_message_handler
we need to check to see if the incoming message is a commit message and to check if it contains a block.
...
def on_message_handler(message):
commit = parse_subscribe_repos_message(message)
if not isinstance(commit, models.ComAtprotoSyncSubscribeRepos.Commit):
return
if not commit.blocks:
return
...
client.start(on_message_handler)
With the commit block, we'll need to extract the original data using CAR. With this information, we can check if the commit operation is a "create" action. This will focus on new activities (such as posts and replies etc) as they appear in real-time.
...
def on_message_handler(message):
commit = parse_subscribe_repos_message(message)
if not isinstance(commit, models.ComAtprotoSyncSubscribeRepos.Commit):
return
if not commit.blocks:
return
car = CAR.from_bytes(commit.blocks)
for op in commit.ops:
if op.action in ["create"] and op.cid:
data = car.blocks.get(op.cid)
...
client.start(on_message_handler)
Next, we can get posts by filtering for the lexicon of type app.bsky.feed.post
and then check if our text contains a keyword ("#coffee" in my case because, why not).
...
def on_message_handler(message):
commit = parse_subscribe_repos_message(message)
if not isinstance(commit, models.ComAtprotoSyncSubscribeRepos.Commit):
return
if not commit.blocks:
return
car = CAR.from_bytes(commit.blocks)
for op in commit.ops:
if op.action in ["create"] and op.cid:
data = car.blocks.get(op.cid)
if data['$type'] == 'app.bsky.feed.post':
text = data['text']
if 'coffee' in text:
print(text)
client.start(on_message_handler)
Putting this all together gives us this....
from atproto import FirehoseSubscribeReposClient, parse_subscribe_repos_message, models, CAR
client = FirehoseSubscribeReposClient()
def on_message_handler(message):
commit = parse_subscribe_repos_message(message)
if not isinstance(commit, models.ComAtprotoSyncSubscribeRepos.Commit):
return
if not commit.blocks:
return
car = CAR.from_bytes(commit.blocks)
for op in commit.ops:
if op.action in ["create"] and op.cid:
data = car.blocks.get(op.cid)
if data['$type'] == 'app.bsky.feed.post':
text = data['text']
if 'coffee' in text:
print(text)
client.start(on_message_handler)
More Firehose Fun
We can do so much more with this other than streaming posts. Using information extracted from data['$type']
we can stream a bunch of activities including the following to list just a few:
- Likes:
app.bsky.feed.like
- Reposts:
app.bsky.feed.repost
- Blocks:
app.bsky.graph.block
- Follows:
app.bsky.graph.follow
- Starter Packs:
app.bsky.graph.starterpack
There's no end of possibilities with this, so be as creative as you can. Using some slightly modified code, here's an example of what a follower network looks like based on one hour's worth of activity in Bluesky. As you can see, the results are quite impressive.
![](https://jrashford.com/content/images/2024/12/follow-1hour.png)