2022-09-12 19:02:45 -04:00
|
|
|
# Basic twitch bot
|
|
|
|
|
|
|
|
import os
|
2022-11-27 23:06:02 -05:00
|
|
|
import random
|
2022-09-13 23:00:59 -04:00
|
|
|
import sqlite3
|
2024-12-27 01:40:14 -05:00
|
|
|
import re
|
2022-09-12 19:02:45 -04:00
|
|
|
from twitchio.ext import commands
|
2022-10-02 16:39:38 -04:00
|
|
|
from twitchio.ext import pubsub
|
2022-09-12 19:02:45 -04:00
|
|
|
from dotenv import load_dotenv
|
2023-02-15 22:32:09 -05:00
|
|
|
import requests
|
2022-09-12 19:02:45 -04:00
|
|
|
|
2022-09-12 22:17:49 -04:00
|
|
|
PREFIX = "!"
|
2022-09-13 23:00:59 -04:00
|
|
|
DB_PATH = "storage.db"
|
2024-12-27 01:40:14 -05:00
|
|
|
CHESS_RE = re.compile("\\b(([KQRBN]?[a-h]?[1-8]?x?[a-h][1-8](=?[QRBNqrbn])?)|(O-O(-O)?))[+#]?\\b")
|
2022-09-12 22:17:49 -04:00
|
|
|
|
2022-09-12 19:02:45 -04:00
|
|
|
|
|
|
|
class Bot(commands.Bot):
|
|
|
|
def __init__(self):
|
2022-09-12 22:17:49 -04:00
|
|
|
super().__init__(token = os.environ["TMI_TOKEN"], prefix=PREFIX, initial_channels=[os.environ["CHANNEL"]])
|
2022-09-13 23:00:59 -04:00
|
|
|
self.db_con = sqlite3.connect(DB_PATH)
|
|
|
|
self.db_cur = self.db_con.cursor()
|
|
|
|
|
2022-09-19 00:04:55 -04:00
|
|
|
self.db_cur.execute("CREATE TABLE IF NOT EXISTS custom_commands(command TEXT, output_text TEXT, author TEXT)")
|
2022-10-15 17:16:19 -04:00
|
|
|
self.db_cur.execute("CREATE TABLE IF NOT EXISTS channel_reward_messages(title TEXT, output_text TEXT)")
|
2022-09-19 00:04:55 -04:00
|
|
|
print("Created database table")
|
|
|
|
commands = self.db_cur.execute("SELECT count(*) FROM custom_commands").fetchone()
|
|
|
|
print(f"{commands[0]} custom commands available")
|
2022-09-12 19:02:45 -04:00
|
|
|
|
2024-12-27 01:40:14 -05:00
|
|
|
self.shuffle_message = "cubes are just balls with corners lmao"
|
|
|
|
|
|
|
|
self.peer_pressure_message = ""
|
|
|
|
self.peer_pressure_users = []
|
|
|
|
|
|
|
|
self.hydrate_count = 0
|
2023-02-15 22:41:25 -05:00
|
|
|
|
2022-10-02 16:39:38 -04:00
|
|
|
# Adds a custom command from a string
|
|
|
|
async def add_custom_command(self, ctx, command_str, author):
|
|
|
|
args = command_str.split(" ", 1)
|
|
|
|
if len(args) < 2:
|
|
|
|
await ctx.send("lmao plz supply 2 args")
|
|
|
|
return
|
|
|
|
command_name = args[0]
|
|
|
|
command_text = args[1]
|
|
|
|
|
|
|
|
if not command_name.isalnum():
|
|
|
|
await ctx.send("lmao that command name is too funky")
|
|
|
|
return
|
|
|
|
|
|
|
|
if self.db_cur.execute("SELECT * FROM custom_commands WHERE command=?", [command_name]).fetchone() is not None \
|
|
|
|
or self.get_command(command_name) is not None:
|
|
|
|
await ctx.send("lmao that command already exists")
|
|
|
|
return
|
|
|
|
|
|
|
|
self.db_cur.execute("INSERT INTO custom_commands VALUES (?, ?, ?)", [command_name, command_text, author])
|
|
|
|
self.db_con.commit()
|
|
|
|
await ctx.send(f"Adding command: \"{PREFIX}{command_name}\" -> \"{command_text}\"")
|
|
|
|
|
2022-09-12 19:02:45 -04:00
|
|
|
async def event_ready(self):
|
|
|
|
print(f"Logged in as | {self.nick}")
|
|
|
|
print(f"User id is | {self.user_id}")
|
|
|
|
|
2022-10-02 16:39:38 -04:00
|
|
|
# This entire pubsub thing feels so janky... There simply must be a better way to do it
|
|
|
|
if "PUBSUB_TOKEN" in os.environ and "PUBSUB_USER_ID" in os.environ:
|
|
|
|
self.pubsub = pubsub.PubSubPool(self)
|
|
|
|
topics = [
|
|
|
|
pubsub.channel_points(os.environ["PUBSUB_TOKEN"])[int(os.environ["PUBSUB_USER_ID"])]
|
|
|
|
]
|
|
|
|
await self.pubsub.subscribe_topics(topics)
|
|
|
|
|
|
|
|
# Can I put this function somewhere else LMAO
|
|
|
|
@self.event()
|
|
|
|
async def event_pubsub_channel_points(event: pubsub.PubSubChannelPointsMessage):
|
2022-10-15 17:16:19 -04:00
|
|
|
reward_title = event.reward.title
|
2024-12-27 01:40:14 -05:00
|
|
|
if reward_title == "Hydrate!": # Hack for hydrate count...
|
|
|
|
self.hydrate_count += 1
|
2022-10-15 17:16:19 -04:00
|
|
|
result = self.db_cur.execute("SELECT output_text FROM channel_reward_messages WHERE title=?", [reward_title]).fetchone()
|
|
|
|
if result is not None:
|
|
|
|
message = result[0]
|
|
|
|
message = message.replace("%USER%", event.user.name)
|
|
|
|
if event.input:
|
|
|
|
message = message.replace("%INPUT%", event.input)
|
|
|
|
await self.get_channel(os.environ["CHANNEL"]).send(message)
|
2022-10-02 16:39:38 -04:00
|
|
|
|
2022-09-12 22:13:41 -04:00
|
|
|
async def event_message(self, message):
|
|
|
|
if message.echo:
|
|
|
|
return
|
|
|
|
|
2024-12-27 01:40:14 -05:00
|
|
|
if "CHESS_ADDR" in os.environ and "CHESS_TOKEN" in os.environ:
|
|
|
|
await self.handle_chess(message)
|
|
|
|
|
2022-09-12 22:13:41 -04:00
|
|
|
# I really want to do this with the add_command function and have no need for this event_message override, but
|
|
|
|
# I cannot for the life of me figure out how to make "anonymous" coroutines (like, async lambda or something),
|
|
|
|
# so I'm just manually handling the command here before passing to the command handler
|
2022-09-13 23:04:41 -04:00
|
|
|
if message.content.startswith(PREFIX):
|
|
|
|
first_token = message.content[len(PREFIX):].strip().split()[0]
|
2022-09-13 23:17:05 -04:00
|
|
|
res = self.db_cur.execute("SELECT output_text FROM custom_commands WHERE command=?", [first_token]).fetchone()
|
|
|
|
if res is not None:
|
2024-12-27 01:40:14 -05:00
|
|
|
output = res[0]
|
|
|
|
output = output.replace("%USER%", message.author.name)
|
|
|
|
await message.channel.send(output)
|
2022-09-12 22:13:41 -04:00
|
|
|
return
|
|
|
|
|
2024-12-27 01:40:14 -05:00
|
|
|
content = message.content.replace(u"\U000E0000", "").strip()
|
|
|
|
if content == self.peer_pressure_message:
|
|
|
|
if not message.author.name in self.peer_pressure_users:
|
|
|
|
self.peer_pressure_users.append(message.author.name)
|
|
|
|
if len(self.peer_pressure_users) >= 3:
|
|
|
|
await message.channel.send(content)
|
|
|
|
self.peer_pressure_users = []
|
|
|
|
else:
|
|
|
|
self.peer_pressure_message = content
|
|
|
|
self.peer_pressure_users = [message.author.name]
|
|
|
|
|
2022-09-12 22:13:41 -04:00
|
|
|
await self.handle_commands(message)
|
|
|
|
|
2024-12-27 01:40:14 -05:00
|
|
|
if len(content.split(" ")) >= 3:
|
|
|
|
self.shuffle_message = content
|
|
|
|
|
|
|
|
async def handle_chess(self, message):
|
|
|
|
# Don't do this if this is already a chess command (e.g. likely to have false positives when entering FEN)
|
|
|
|
if message.content.startswith(PREFIX) and message.content[len(PREFIX):].strip().split()[0] == "chess":
|
|
|
|
return
|
|
|
|
|
|
|
|
# Check if the current message contains a chess move and play it
|
|
|
|
move_match = CHESS_RE.search(message.content)
|
|
|
|
if move_match is not None:
|
|
|
|
move = move_match.group()
|
|
|
|
headers = {"Authorization": f"Bearer {os.environ['CHESS_TOKEN']}"}
|
|
|
|
params = {"chatter": message.author.name, "fullMessage": message.content}
|
|
|
|
raw_response = requests.post(f"{os.environ['CHESS_ADDR']}/api/move/{move}", params=params, headers=headers)
|
|
|
|
response = raw_response.json()
|
|
|
|
if "error" in response:
|
|
|
|
await message.channel.send(f"Error: {response['error']}")
|
|
|
|
else:
|
|
|
|
output = f"Playing move {response['move']['san']}"
|
|
|
|
if response["draw"]:
|
|
|
|
output += ", game ends in a draw!"
|
|
|
|
elif response["checkmate"]:
|
|
|
|
output += f", {'black' if response['move']['color'] == 'b' else 'white'} wins by checkmate!"
|
|
|
|
output += " !chess"
|
|
|
|
await message.channel.send(output)
|
2023-02-15 22:41:25 -05:00
|
|
|
|
2022-09-12 19:02:45 -04:00
|
|
|
@commands.command()
|
|
|
|
async def hello(self, ctx: commands.Context):
|
|
|
|
# Basic hello world command, executed with "!hello". Reply hello to whoever made the command
|
2022-09-12 19:44:30 -04:00
|
|
|
response = f"Hello {ctx.author.name}! "
|
|
|
|
if ctx.author.is_broadcaster:
|
|
|
|
response += "👑"
|
|
|
|
else:
|
|
|
|
if ctx.author.is_mod:
|
|
|
|
response += "⚔"
|
2022-09-17 00:12:29 -04:00
|
|
|
# Uncomment these once twitchio gets a new release
|
|
|
|
#elif ctx.author.is_vip:
|
|
|
|
# response += "💎"
|
2022-09-12 19:44:30 -04:00
|
|
|
if ctx.author.is_subscriber:
|
|
|
|
response += "🤑"
|
|
|
|
await ctx.send(response.strip())
|
2022-09-12 19:02:45 -04:00
|
|
|
|
2022-09-12 22:13:41 -04:00
|
|
|
@commands.command()
|
2022-09-18 22:15:34 -04:00
|
|
|
async def addcmd(self, ctx: commands.Context):
|
2022-09-12 22:13:41 -04:00
|
|
|
# Lets moderators add custom text responses
|
|
|
|
if not ctx.author.is_mod:
|
|
|
|
await ctx.send("lmao nice try ur not a mod")
|
|
|
|
return
|
|
|
|
# Not sure if ctx.args is supposed to work but it seems like it doesn't...
|
|
|
|
# I want the last arg to not get split anyway, so I do it myself
|
2022-10-02 16:39:38 -04:00
|
|
|
args = ctx.message.content.split(" ", 1)
|
|
|
|
if len(args) < 2:
|
|
|
|
await ctx.send("lmao plz supply 2 args")
|
2022-09-12 22:13:41 -04:00
|
|
|
return
|
2022-10-02 16:39:38 -04:00
|
|
|
await self.add_custom_command(ctx, args[1], ctx.author.name)
|
2022-09-12 22:13:41 -04:00
|
|
|
|
2022-09-12 22:25:54 -04:00
|
|
|
@commands.command()
|
2022-09-18 22:15:34 -04:00
|
|
|
async def removecmd(self, ctx: commands.Context):
|
2022-09-12 22:25:54 -04:00
|
|
|
if not ctx.author.is_mod:
|
|
|
|
await ctx.send("lmao nice try ur not a mod")
|
|
|
|
return
|
|
|
|
|
|
|
|
args = ctx.message.content.split(" ", 1)
|
|
|
|
if len(args) < 2:
|
|
|
|
await ctx.send("lmao wut command")
|
|
|
|
return
|
|
|
|
|
|
|
|
command_name = args[1]
|
|
|
|
if self.get_command(command_name) is not None:
|
|
|
|
await ctx.send("lmao u cant delet this")
|
|
|
|
return
|
|
|
|
|
2022-09-13 23:17:05 -04:00
|
|
|
if self.db_cur.execute("SELECT * FROM custom_commands WHERE command=?", [command_name]).fetchone() is None:
|
2022-09-12 22:25:54 -04:00
|
|
|
await ctx.send("lmao wut is that command")
|
|
|
|
return
|
|
|
|
|
2022-09-13 23:00:59 -04:00
|
|
|
self.db_cur.execute("DELETE FROM custom_commands WHERE command=?", [command_name])
|
|
|
|
self.db_con.commit()
|
2022-09-12 22:25:54 -04:00
|
|
|
await ctx.send(f"Deleted command \"{PREFIX}{command_name}\"")
|
|
|
|
|
2022-09-12 23:25:48 -04:00
|
|
|
@commands.command()
|
|
|
|
async def help(self, ctx: commands.Context):
|
|
|
|
message = "Available commands:"
|
|
|
|
for command in self.commands:
|
2022-09-18 22:53:40 -04:00
|
|
|
if command not in ["addcmd", "removecmd"] or ctx.author.is_mod:
|
|
|
|
message += f" {PREFIX}{command}"
|
2022-09-13 23:17:05 -04:00
|
|
|
for command in self.db_cur.execute("SELECT command FROM custom_commands").fetchall():
|
|
|
|
message += f" {PREFIX}{command[0]}"
|
2022-09-12 23:25:48 -04:00
|
|
|
await ctx.send(message)
|
|
|
|
|
2022-11-27 23:06:02 -05:00
|
|
|
@commands.command()
|
|
|
|
async def balls(self, ctx: commands.Context):
|
|
|
|
word_list = "cubes are just balls with corners lmao".split(" ")
|
|
|
|
random.shuffle(word_list)
|
|
|
|
await ctx.send(" ".join(word_list))
|
|
|
|
|
2023-02-15 22:32:09 -05:00
|
|
|
@commands.command()
|
|
|
|
async def donation(self, ctx: commands.Context):
|
|
|
|
r = requests.get("https://taskinoz.com/gdq/api/")
|
|
|
|
await ctx.send(r.text)
|
|
|
|
|
2023-02-15 22:41:25 -05:00
|
|
|
@commands.command()
|
|
|
|
async def shuffle(self, ctx: commands.Context):
|
2024-12-27 01:40:14 -05:00
|
|
|
shuffled_message_list = self.shuffle_message.split(" ")
|
2023-02-15 22:41:25 -05:00
|
|
|
random.shuffle(shuffled_message_list)
|
|
|
|
await ctx.send(" ".join(shuffled_message_list))
|
|
|
|
|
2024-12-27 01:40:14 -05:00
|
|
|
# Requested by linguini15
|
|
|
|
@commands.command()
|
|
|
|
async def shape(self, ctx: commands.Context):
|
|
|
|
platonic_solids = ["tetrahedron", "hexahedron", "octahedron", "dodecahedron", "icosahedron"]
|
|
|
|
solid = random.choice(platonic_solids)
|
|
|
|
message = f"did you guys know the marble is a{'n' if solid[0] in ['i', 'o'] else ''} {solid}!?!?!?"
|
|
|
|
await ctx.send(message)
|
|
|
|
|
|
|
|
@commands.command()
|
|
|
|
async def dot(self, ctx: commands.Context):
|
|
|
|
responses = [
|
|
|
|
"Oh, no way! It's another door!",
|
|
|
|
"You found a... What is this? I don't even...",
|
|
|
|
"Hey, listen!",
|
|
|
|
"Owls creep me out.",
|
|
|
|
"I can't remember what that is. I forget... My memory isn't what it used to be. I can't remember...",
|
|
|
|
"Hello, what's this? You found a secret passage! Who knows where it leads? I don't.",
|
|
|
|
"I'm as confused as you are. I have no idea.",
|
|
|
|
"It looks like a book. Yup! it's a book!",
|
|
|
|
"You have found a treasure map! A map of what? to where? I don't know! Figure it out yourself!",
|
|
|
|
"I wonder what this is...",
|
|
|
|
"I wonder what this means...",
|
|
|
|
"Something like, oh, I don't know...",
|
|
|
|
"...",
|
|
|
|
"Uh?",
|
|
|
|
"What does it mean?"
|
|
|
|
]
|
|
|
|
await ctx.send(random.choice(responses))
|
|
|
|
|
|
|
|
# Requested by friendsupporter3829
|
|
|
|
@commands.command()
|
|
|
|
async def hydratecount(self, ctx: commands.Context):
|
|
|
|
await ctx.send(f"Number of hydrates this stream: {self.hydrate_count}")
|
|
|
|
|
|
|
|
# Requested by linguini15
|
|
|
|
@commands.command()
|
|
|
|
async def calc(self, ctx: commands.Context):
|
|
|
|
args = ctx.message.content.split(" ", 1)
|
|
|
|
if len(args) < 2:
|
|
|
|
await ctx.send("Requires argument to calculate") # No it doesn't lmfao
|
|
|
|
return
|
|
|
|
|
|
|
|
components_seed = random.random()
|
|
|
|
has_real = components_seed <= 0.85
|
|
|
|
has_imaginary = components_seed >= 0.6
|
|
|
|
|
|
|
|
message = ""
|
|
|
|
if has_real:
|
|
|
|
real = random.triangular(-20, 20)
|
|
|
|
message += f"{real:.2f}"
|
|
|
|
if random.random() > 0.75:
|
|
|
|
message += "π"
|
|
|
|
if has_imaginary:
|
|
|
|
imag = random.triangular(-20, 20)
|
|
|
|
if has_real:
|
|
|
|
message += f" {'+' if imag >= 0 else '-'} "
|
|
|
|
imag = abs(imag)
|
|
|
|
message += f"{imag:.2f}"
|
|
|
|
if random.random() > 0.75:
|
|
|
|
message += "π"
|
|
|
|
message += "i"
|
|
|
|
|
|
|
|
await ctx.send(message)
|
|
|
|
|
|
|
|
# TODO - !weather (requested by linguini15), !chess
|
|
|
|
|
2022-09-12 19:02:45 -04:00
|
|
|
|
|
|
|
def main():
|
|
|
|
load_dotenv()
|
|
|
|
bot = Bot()
|
|
|
|
bot.run()
|
|
|
|
# bot.run() is blocking and will stop execution of any below code here until stopped or closed.
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
main()
|