Compare commits
10 commits
d158a1cf62
...
a6a9ab5ebb
Author | SHA1 | Date | |
---|---|---|---|
a6a9ab5ebb | |||
86473c1028 | |||
84bda4043e | |||
e2a464ecb7 | |||
87c5fd1abf | |||
8cfc10c039 | |||
83cbd44204 | |||
d2939b8dee | |||
95747dbd36 | |||
70cbb85402 |
3 changed files with 209 additions and 26 deletions
10
README.md
10
README.md
|
@ -25,3 +25,13 @@ python3 bot.py
|
|||
```
|
||||
|
||||
Ask me for help if none of this makes sense lol
|
||||
|
||||
## Generating token
|
||||
|
||||
Link to generate twitch bot token:
|
||||
|
||||
https://twitchtokengenerator.com/quick/n5zYtaZFY8
|
||||
|
||||
Link to generate pubsub token:
|
||||
|
||||
https://twitchtokengenerator.com/quick/NWDIPn68zM
|
||||
|
|
220
bot.py
220
bot.py
|
@ -1,12 +1,17 @@
|
|||
# Basic twitch bot
|
||||
|
||||
import os
|
||||
import random
|
||||
import sqlite3
|
||||
import re
|
||||
from twitchio.ext import commands
|
||||
from twitchio.ext import pubsub
|
||||
from dotenv import load_dotenv
|
||||
import requests
|
||||
|
||||
PREFIX = "!"
|
||||
DB_PATH = "storage.db"
|
||||
CHESS_RE = re.compile("\\b(([KQRBN]?[a-h]?[1-8]?x?[a-h][1-8](=?[QRBNqrbn])?)|(O-O(-O)?))[+#]?\\b")
|
||||
|
||||
|
||||
class Bot(commands.Bot):
|
||||
|
@ -15,21 +20,74 @@ class Bot(commands.Bot):
|
|||
self.db_con = sqlite3.connect(DB_PATH)
|
||||
self.db_cur = self.db_con.cursor()
|
||||
|
||||
if self.db_cur.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='custom_commands'").fetchone() == None:
|
||||
self.db_cur.execute("CREATE TABLE custom_commands(command, output_text)")
|
||||
print("Created database table")
|
||||
else:
|
||||
commands = self.db_cur.execute("SELECT * FROM custom_commands").fetchall()
|
||||
print(f"{len(commands)} custom commands available")
|
||||
self.db_cur.execute("CREATE TABLE IF NOT EXISTS custom_commands(command TEXT, output_text TEXT, author TEXT)")
|
||||
self.db_cur.execute("CREATE TABLE IF NOT EXISTS channel_reward_messages(title TEXT, output_text TEXT)")
|
||||
print("Created database table")
|
||||
commands = self.db_cur.execute("SELECT count(*) FROM custom_commands").fetchone()
|
||||
print(f"{commands[0]} custom commands available")
|
||||
|
||||
self.shuffle_message = "cubes are just balls with corners lmao"
|
||||
|
||||
self.peer_pressure_message = ""
|
||||
self.peer_pressure_users = []
|
||||
|
||||
self.hydrate_count = 0
|
||||
|
||||
# Adds a custom command from a string
|
||||
async def add_custom_command(self, ctx, command_str, author):
|
||||
args = command_str.split(" ", 1)
|
||||
if len(args) < 2:
|
||||
await ctx.send("lmao plz supply 2 args")
|
||||
return
|
||||
command_name = args[0]
|
||||
command_text = args[1]
|
||||
|
||||
if not command_name.isalnum():
|
||||
await ctx.send("lmao that command name is too funky")
|
||||
return
|
||||
|
||||
if self.db_cur.execute("SELECT * FROM custom_commands WHERE command=?", [command_name]).fetchone() is not None \
|
||||
or self.get_command(command_name) is not None:
|
||||
await ctx.send("lmao that command already exists")
|
||||
return
|
||||
|
||||
self.db_cur.execute("INSERT INTO custom_commands VALUES (?, ?, ?)", [command_name, command_text, author])
|
||||
self.db_con.commit()
|
||||
await ctx.send(f"Adding command: \"{PREFIX}{command_name}\" -> \"{command_text}\"")
|
||||
|
||||
async def event_ready(self):
|
||||
print(f"Logged in as | {self.nick}")
|
||||
print(f"User id is | {self.user_id}")
|
||||
|
||||
# This entire pubsub thing feels so janky... There simply must be a better way to do it
|
||||
if "PUBSUB_TOKEN" in os.environ and "PUBSUB_USER_ID" in os.environ:
|
||||
self.pubsub = pubsub.PubSubPool(self)
|
||||
topics = [
|
||||
pubsub.channel_points(os.environ["PUBSUB_TOKEN"])[int(os.environ["PUBSUB_USER_ID"])]
|
||||
]
|
||||
await self.pubsub.subscribe_topics(topics)
|
||||
|
||||
# Can I put this function somewhere else LMAO
|
||||
@self.event()
|
||||
async def event_pubsub_channel_points(event: pubsub.PubSubChannelPointsMessage):
|
||||
reward_title = event.reward.title
|
||||
if reward_title == "Hydrate!": # Hack for hydrate count...
|
||||
self.hydrate_count += 1
|
||||
result = self.db_cur.execute("SELECT output_text FROM channel_reward_messages WHERE title=?", [reward_title]).fetchone()
|
||||
if result is not None:
|
||||
message = result[0]
|
||||
message = message.replace("%USER%", event.user.name)
|
||||
if event.input:
|
||||
message = message.replace("%INPUT%", event.input)
|
||||
await self.get_channel(os.environ["CHANNEL"]).send(message)
|
||||
|
||||
async def event_message(self, message):
|
||||
if message.echo:
|
||||
return
|
||||
|
||||
if "CHESS_ADDR" in os.environ and "CHESS_TOKEN" in os.environ:
|
||||
await self.handle_chess(message)
|
||||
|
||||
# I really want to do this with the add_command function and have no need for this event_message override, but
|
||||
# I cannot for the life of me figure out how to make "anonymous" coroutines (like, async lambda or something),
|
||||
# so I'm just manually handling the command here before passing to the command handler
|
||||
|
@ -37,11 +95,51 @@ class Bot(commands.Bot):
|
|||
first_token = message.content[len(PREFIX):].strip().split()[0]
|
||||
res = self.db_cur.execute("SELECT output_text FROM custom_commands WHERE command=?", [first_token]).fetchone()
|
||||
if res is not None:
|
||||
await message.channel.send(res[0])
|
||||
output = res[0]
|
||||
output = output.replace("%USER%", message.author.name)
|
||||
await message.channel.send(output)
|
||||
return
|
||||
|
||||
content = message.content.replace(u"\U000E0000", "").strip()
|
||||
if content == self.peer_pressure_message:
|
||||
if not message.author.name in self.peer_pressure_users:
|
||||
self.peer_pressure_users.append(message.author.name)
|
||||
if len(self.peer_pressure_users) >= 3:
|
||||
await message.channel.send(content)
|
||||
self.peer_pressure_users = []
|
||||
else:
|
||||
self.peer_pressure_message = content
|
||||
self.peer_pressure_users = [message.author.name]
|
||||
|
||||
await self.handle_commands(message)
|
||||
|
||||
if len(content.split(" ")) >= 3:
|
||||
self.shuffle_message = content
|
||||
|
||||
async def handle_chess(self, message):
|
||||
# Don't do this if this is already a chess command (e.g. likely to have false positives when entering FEN)
|
||||
if message.content.startswith(PREFIX) and message.content[len(PREFIX):].strip().split()[0] == "chess":
|
||||
return
|
||||
|
||||
# Check if the current message contains a chess move and play it
|
||||
move_match = CHESS_RE.search(message.content)
|
||||
if move_match is not None:
|
||||
move = move_match.group()
|
||||
headers = {"Authorization": f"Bearer {os.environ['CHESS_TOKEN']}"}
|
||||
params = {"chatter": message.author.name, "fullMessage": message.content}
|
||||
raw_response = requests.post(f"{os.environ['CHESS_ADDR']}/api/move/{move}", params=params, headers=headers)
|
||||
response = raw_response.json()
|
||||
if "error" in response:
|
||||
await message.channel.send(f"Error: {response['error']}")
|
||||
else:
|
||||
output = f"Playing move {response['move']['san']}"
|
||||
if response["draw"]:
|
||||
output += ", game ends in a draw!"
|
||||
elif response["checkmate"]:
|
||||
output += f", {'black' if response['move']['color'] == 'b' else 'white'} wins by checkmate!"
|
||||
output += " !chess"
|
||||
await message.channel.send(output)
|
||||
|
||||
@commands.command()
|
||||
async def hello(self, ctx: commands.Context):
|
||||
# Basic hello world command, executed with "!hello". Reply hello to whoever made the command
|
||||
|
@ -59,31 +157,21 @@ class Bot(commands.Bot):
|
|||
await ctx.send(response.strip())
|
||||
|
||||
@commands.command()
|
||||
async def addcommand(self, ctx: commands.Context):
|
||||
async def addcmd(self, ctx: commands.Context):
|
||||
# Lets moderators add custom text responses
|
||||
if not ctx.author.is_mod:
|
||||
await ctx.send("lmao nice try ur not a mod")
|
||||
return
|
||||
# Not sure if ctx.args is supposed to work but it seems like it doesn't...
|
||||
# I want the last arg to not get split anyway, so I do it myself
|
||||
args = ctx.message.content.split(" ", 2)
|
||||
if len(args) < 3:
|
||||
await ctx.send("lmao plz supply 3 args")
|
||||
args = ctx.message.content.split(" ", 1)
|
||||
if len(args) < 2:
|
||||
await ctx.send("lmao plz supply 2 args")
|
||||
return
|
||||
command_name = args[1]
|
||||
command_text = args[2]
|
||||
|
||||
if self.db_cur.execute("SELECT * FROM custom_commands WHERE command=?", [command_name]).fetchone() is not None \
|
||||
or self.get_command(command_name) is not None:
|
||||
await ctx.send("lmao that command already exists")
|
||||
return
|
||||
|
||||
self.db_cur.execute("INSERT INTO custom_commands VALUES (?, ?)", [command_name, command_text])
|
||||
self.db_con.commit()
|
||||
await ctx.send(f"Adding command: \"{PREFIX}{command_name}\" -> \"{command_text}\"")
|
||||
await self.add_custom_command(ctx, args[1], ctx.author.name)
|
||||
|
||||
@commands.command()
|
||||
async def removecommand(self, ctx: commands.Context):
|
||||
async def removecmd(self, ctx: commands.Context):
|
||||
if not ctx.author.is_mod:
|
||||
await ctx.send("lmao nice try ur not a mod")
|
||||
return
|
||||
|
@ -110,11 +198,95 @@ class Bot(commands.Bot):
|
|||
async def help(self, ctx: commands.Context):
|
||||
message = "Available commands:"
|
||||
for command in self.commands:
|
||||
message += f" {PREFIX}{command}"
|
||||
if command not in ["addcmd", "removecmd"] or ctx.author.is_mod:
|
||||
message += f" {PREFIX}{command}"
|
||||
for command in self.db_cur.execute("SELECT command FROM custom_commands").fetchall():
|
||||
message += f" {PREFIX}{command[0]}"
|
||||
await ctx.send(message)
|
||||
|
||||
@commands.command()
|
||||
async def balls(self, ctx: commands.Context):
|
||||
word_list = "cubes are just balls with corners lmao".split(" ")
|
||||
random.shuffle(word_list)
|
||||
await ctx.send(" ".join(word_list))
|
||||
|
||||
@commands.command()
|
||||
async def donation(self, ctx: commands.Context):
|
||||
r = requests.get("https://taskinoz.com/gdq/api/")
|
||||
await ctx.send(r.text)
|
||||
|
||||
@commands.command()
|
||||
async def shuffle(self, ctx: commands.Context):
|
||||
shuffled_message_list = self.shuffle_message.split(" ")
|
||||
random.shuffle(shuffled_message_list)
|
||||
await ctx.send(" ".join(shuffled_message_list))
|
||||
|
||||
# Requested by linguini15
|
||||
@commands.command()
|
||||
async def shape(self, ctx: commands.Context):
|
||||
platonic_solids = ["tetrahedron", "hexahedron", "octahedron", "dodecahedron", "icosahedron"]
|
||||
solid = random.choice(platonic_solids)
|
||||
message = f"did you guys know the marble is a{'n' if solid[0] in ['i', 'o'] else ''} {solid}!?!?!?"
|
||||
await ctx.send(message)
|
||||
|
||||
@commands.command()
|
||||
async def dot(self, ctx: commands.Context):
|
||||
responses = [
|
||||
"Oh, no way! It's another door!",
|
||||
"You found a... What is this? I don't even...",
|
||||
"Hey, listen!",
|
||||
"Owls creep me out.",
|
||||
"I can't remember what that is. I forget... My memory isn't what it used to be. I can't remember...",
|
||||
"Hello, what's this? You found a secret passage! Who knows where it leads? I don't.",
|
||||
"I'm as confused as you are. I have no idea.",
|
||||
"It looks like a book. Yup! it's a book!",
|
||||
"You have found a treasure map! A map of what? to where? I don't know! Figure it out yourself!",
|
||||
"I wonder what this is...",
|
||||
"I wonder what this means...",
|
||||
"Something like, oh, I don't know...",
|
||||
"...",
|
||||
"Uh?",
|
||||
"What does it mean?"
|
||||
]
|
||||
await ctx.send(random.choice(responses))
|
||||
|
||||
# Requested by friendsupporter3829
|
||||
@commands.command()
|
||||
async def hydratecount(self, ctx: commands.Context):
|
||||
await ctx.send(f"Number of hydrates this stream: {self.hydrate_count}")
|
||||
|
||||
# Requested by linguini15
|
||||
@commands.command()
|
||||
async def calc(self, ctx: commands.Context):
|
||||
args = ctx.message.content.split(" ", 1)
|
||||
if len(args) < 2:
|
||||
await ctx.send("Requires argument to calculate") # No it doesn't lmfao
|
||||
return
|
||||
|
||||
components_seed = random.random()
|
||||
has_real = components_seed <= 0.85
|
||||
has_imaginary = components_seed >= 0.6
|
||||
|
||||
message = ""
|
||||
if has_real:
|
||||
real = random.triangular(-20, 20)
|
||||
message += f"{real:.2f}"
|
||||
if random.random() > 0.75:
|
||||
message += "π"
|
||||
if has_imaginary:
|
||||
imag = random.triangular(-20, 20)
|
||||
if has_real:
|
||||
message += f" {'+' if imag >= 0 else '-'} "
|
||||
imag = abs(imag)
|
||||
message += f"{imag:.2f}"
|
||||
if random.random() > 0.75:
|
||||
message += "π"
|
||||
message += "i"
|
||||
|
||||
await ctx.send(message)
|
||||
|
||||
# TODO - !weather (requested by linguini15), !chess
|
||||
|
||||
|
||||
def main():
|
||||
load_dotenv()
|
||||
|
|
|
@ -1,2 +1,3 @@
|
|||
twitchio==2.4.0
|
||||
python-dotenv==0.21.0
|
||||
twitchio==2.10.0
|
||||
python-dotenv==1.0.1
|
||||
requests==2.32.3
|
||||
|
|
Loading…
Add table
Reference in a new issue