thearstbot-twitch/bot.py

300 lines
10 KiB
Python
Raw Permalink Normal View History

2022-09-12 19:02:45 -04:00
# Basic twitch bot
import os
2022-11-27 23:06:02 -05:00
import random
2022-09-13 23:00:59 -04:00
import sqlite3
import re
2022-09-12 19:02:45 -04:00
from twitchio.ext import commands
from twitchio.ext import pubsub
2022-09-12 19:02:45 -04:00
from dotenv import load_dotenv
2023-02-15 22:32:09 -05:00
import requests
2022-09-12 19:02:45 -04:00
2022-09-12 22:17:49 -04:00
PREFIX = "!"
2022-09-13 23:00:59 -04:00
DB_PATH = "storage.db"
CHESS_RE = re.compile("\\b(([KQRBN]?[a-h]?[1-8]?x?[a-h][1-8](=?[QRBNqrbn])?)|(O-O(-O)?))[+#]?\\b")
2022-09-12 22:17:49 -04:00
2022-09-12 19:02:45 -04:00
class Bot(commands.Bot):
def __init__(self):
2022-09-12 22:17:49 -04:00
super().__init__(token = os.environ["TMI_TOKEN"], prefix=PREFIX, initial_channels=[os.environ["CHANNEL"]])
2022-09-13 23:00:59 -04:00
self.db_con = sqlite3.connect(DB_PATH)
self.db_cur = self.db_con.cursor()
self.db_cur.execute("CREATE TABLE IF NOT EXISTS custom_commands(command TEXT, output_text TEXT, author TEXT)")
self.db_cur.execute("CREATE TABLE IF NOT EXISTS channel_reward_messages(title TEXT, output_text TEXT)")
print("Created database table")
commands = self.db_cur.execute("SELECT count(*) FROM custom_commands").fetchone()
print(f"{commands[0]} custom commands available")
2022-09-12 19:02:45 -04:00
self.shuffle_message = "cubes are just balls with corners lmao"
self.peer_pressure_message = ""
self.peer_pressure_users = []
self.hydrate_count = 0
2023-02-15 22:41:25 -05:00
# Adds a custom command from a string
async def add_custom_command(self, ctx, command_str, author):
args = command_str.split(" ", 1)
if len(args) < 2:
await ctx.send("lmao plz supply 2 args")
return
command_name = args[0]
command_text = args[1]
if not command_name.isalnum():
await ctx.send("lmao that command name is too funky")
return
if self.db_cur.execute("SELECT * FROM custom_commands WHERE command=?", [command_name]).fetchone() is not None \
or self.get_command(command_name) is not None:
await ctx.send("lmao that command already exists")
return
self.db_cur.execute("INSERT INTO custom_commands VALUES (?, ?, ?)", [command_name, command_text, author])
self.db_con.commit()
await ctx.send(f"Adding command: \"{PREFIX}{command_name}\" -> \"{command_text}\"")
2022-09-12 19:02:45 -04:00
async def event_ready(self):
print(f"Logged in as | {self.nick}")
print(f"User id is | {self.user_id}")
# This entire pubsub thing feels so janky... There simply must be a better way to do it
if "PUBSUB_TOKEN" in os.environ and "PUBSUB_USER_ID" in os.environ:
self.pubsub = pubsub.PubSubPool(self)
topics = [
pubsub.channel_points(os.environ["PUBSUB_TOKEN"])[int(os.environ["PUBSUB_USER_ID"])]
]
await self.pubsub.subscribe_topics(topics)
# Can I put this function somewhere else LMAO
@self.event()
async def event_pubsub_channel_points(event: pubsub.PubSubChannelPointsMessage):
reward_title = event.reward.title
if reward_title == "Hydrate!": # Hack for hydrate count...
self.hydrate_count += 1
result = self.db_cur.execute("SELECT output_text FROM channel_reward_messages WHERE title=?", [reward_title]).fetchone()
if result is not None:
message = result[0]
message = message.replace("%USER%", event.user.name)
if event.input:
message = message.replace("%INPUT%", event.input)
await self.get_channel(os.environ["CHANNEL"]).send(message)
2022-09-12 22:13:41 -04:00
async def event_message(self, message):
if message.echo:
return
if "CHESS_ADDR" in os.environ and "CHESS_TOKEN" in os.environ:
await self.handle_chess(message)
2022-09-12 22:13:41 -04:00
# I really want to do this with the add_command function and have no need for this event_message override, but
# I cannot for the life of me figure out how to make "anonymous" coroutines (like, async lambda or something),
# so I'm just manually handling the command here before passing to the command handler
if message.content.startswith(PREFIX):
first_token = message.content[len(PREFIX):].strip().split()[0]
res = self.db_cur.execute("SELECT output_text FROM custom_commands WHERE command=?", [first_token]).fetchone()
if res is not None:
output = res[0]
output = output.replace("%USER%", message.author.name)
await message.channel.send(output)
2022-09-12 22:13:41 -04:00
return
content = message.content.replace(u"\U000E0000", "").strip()
if content == self.peer_pressure_message:
if not message.author.name in self.peer_pressure_users:
self.peer_pressure_users.append(message.author.name)
if len(self.peer_pressure_users) >= 3:
await message.channel.send(content)
self.peer_pressure_users = []
else:
self.peer_pressure_message = content
self.peer_pressure_users = [message.author.name]
2022-09-12 22:13:41 -04:00
await self.handle_commands(message)
if len(content.split(" ")) >= 3:
self.shuffle_message = content
async def handle_chess(self, message):
# Don't do this if this is already a chess command (e.g. likely to have false positives when entering FEN)
if message.content.startswith(PREFIX) and message.content[len(PREFIX):].strip().split()[0] == "chess":
return
# Check if the current message contains a chess move and play it
move_match = CHESS_RE.search(message.content)
if move_match is not None:
move = move_match.group()
headers = {"Authorization": f"Bearer {os.environ['CHESS_TOKEN']}"}
params = {"chatter": message.author.name, "fullMessage": message.content}
raw_response = requests.post(f"{os.environ['CHESS_ADDR']}/api/move/{move}", params=params, headers=headers)
response = raw_response.json()
if "error" in response:
await message.channel.send(f"Error: {response['error']}")
else:
output = f"Playing move {response['move']['san']}"
if response["draw"]:
output += ", game ends in a draw!"
elif response["checkmate"]:
output += f", {'black' if response['move']['color'] == 'b' else 'white'} wins by checkmate!"
output += " !chess"
await message.channel.send(output)
2023-02-15 22:41:25 -05:00
2022-09-12 19:02:45 -04:00
@commands.command()
async def hello(self, ctx: commands.Context):
# Basic hello world command, executed with "!hello". Reply hello to whoever made the command
2022-09-12 19:44:30 -04:00
response = f"Hello {ctx.author.name}! "
if ctx.author.is_broadcaster:
response += "👑"
else:
if ctx.author.is_mod:
response += ""
# Uncomment these once twitchio gets a new release
#elif ctx.author.is_vip:
# response += "💎"
2022-09-12 19:44:30 -04:00
if ctx.author.is_subscriber:
response += "🤑"
await ctx.send(response.strip())
2022-09-12 19:02:45 -04:00
2022-09-12 22:13:41 -04:00
@commands.command()
async def addcmd(self, ctx: commands.Context):
2022-09-12 22:13:41 -04:00
# Lets moderators add custom text responses
if not ctx.author.is_mod:
await ctx.send("lmao nice try ur not a mod")
return
# Not sure if ctx.args is supposed to work but it seems like it doesn't...
# I want the last arg to not get split anyway, so I do it myself
args = ctx.message.content.split(" ", 1)
if len(args) < 2:
await ctx.send("lmao plz supply 2 args")
2022-09-12 22:13:41 -04:00
return
await self.add_custom_command(ctx, args[1], ctx.author.name)
2022-09-12 22:13:41 -04:00
2022-09-12 22:25:54 -04:00
@commands.command()
async def removecmd(self, ctx: commands.Context):
2022-09-12 22:25:54 -04:00
if not ctx.author.is_mod:
await ctx.send("lmao nice try ur not a mod")
return
args = ctx.message.content.split(" ", 1)
if len(args) < 2:
await ctx.send("lmao wut command")
return
command_name = args[1]
if self.get_command(command_name) is not None:
await ctx.send("lmao u cant delet this")
return
if self.db_cur.execute("SELECT * FROM custom_commands WHERE command=?", [command_name]).fetchone() is None:
2022-09-12 22:25:54 -04:00
await ctx.send("lmao wut is that command")
return
2022-09-13 23:00:59 -04:00
self.db_cur.execute("DELETE FROM custom_commands WHERE command=?", [command_name])
self.db_con.commit()
2022-09-12 22:25:54 -04:00
await ctx.send(f"Deleted command \"{PREFIX}{command_name}\"")
2022-09-12 23:25:48 -04:00
@commands.command()
async def help(self, ctx: commands.Context):
message = "Available commands:"
for command in self.commands:
if command not in ["addcmd", "removecmd"] or ctx.author.is_mod:
message += f" {PREFIX}{command}"
for command in self.db_cur.execute("SELECT command FROM custom_commands").fetchall():
message += f" {PREFIX}{command[0]}"
2022-09-12 23:25:48 -04:00
await ctx.send(message)
2022-11-27 23:06:02 -05:00
@commands.command()
async def balls(self, ctx: commands.Context):
word_list = "cubes are just balls with corners lmao".split(" ")
random.shuffle(word_list)
await ctx.send(" ".join(word_list))
2023-02-15 22:32:09 -05:00
@commands.command()
async def donation(self, ctx: commands.Context):
r = requests.get("https://taskinoz.com/gdq/api/")
await ctx.send(r.text)
2023-02-15 22:41:25 -05:00
@commands.command()
async def shuffle(self, ctx: commands.Context):
shuffled_message_list = self.shuffle_message.split(" ")
2023-02-15 22:41:25 -05:00
random.shuffle(shuffled_message_list)
await ctx.send(" ".join(shuffled_message_list))
# Requested by linguini15
@commands.command()
async def shape(self, ctx: commands.Context):
platonic_solids = ["tetrahedron", "hexahedron", "octahedron", "dodecahedron", "icosahedron"]
solid = random.choice(platonic_solids)
message = f"did you guys know the marble is a{'n' if solid[0] in ['i', 'o'] else ''} {solid}!?!?!?"
await ctx.send(message)
@commands.command()
async def dot(self, ctx: commands.Context):
responses = [
"Oh, no way! It's another door!",
"You found a... What is this? I don't even...",
"Hey, listen!",
"Owls creep me out.",
"I can't remember what that is. I forget... My memory isn't what it used to be. I can't remember...",
"Hello, what's this? You found a secret passage! Who knows where it leads? I don't.",
"I'm as confused as you are. I have no idea.",
"It looks like a book. Yup! it's a book!",
"You have found a treasure map! A map of what? to where? I don't know! Figure it out yourself!",
"I wonder what this is...",
"I wonder what this means...",
"Something like, oh, I don't know...",
"...",
"Uh?",
"What does it mean?"
]
await ctx.send(random.choice(responses))
# Requested by friendsupporter3829
@commands.command()
async def hydratecount(self, ctx: commands.Context):
await ctx.send(f"Number of hydrates this stream: {self.hydrate_count}")
# Requested by linguini15
@commands.command()
async def calc(self, ctx: commands.Context):
args = ctx.message.content.split(" ", 1)
if len(args) < 2:
await ctx.send("Requires argument to calculate") # No it doesn't lmfao
return
components_seed = random.random()
has_real = components_seed <= 0.85
has_imaginary = components_seed >= 0.6
message = ""
if has_real:
real = random.triangular(-20, 20)
message += f"{real:.2f}"
if random.random() > 0.75:
message += "π"
if has_imaginary:
imag = random.triangular(-20, 20)
if has_real:
message += f" {'+' if imag >= 0 else '-'} "
imag = abs(imag)
message += f"{imag:.2f}"
if random.random() > 0.75:
message += "π"
message += "i"
await ctx.send(message)
# TODO - !weather (requested by linguini15), !chess
2022-09-12 19:02:45 -04:00
def main():
load_dotenv()
bot = Bot()
bot.run()
# bot.run() is blocking and will stop execution of any below code here until stopped or closed.
if __name__ == "__main__":
main()