import hmac import hashlib import time import random import sqlite3 import asyncio import tempfile import tarfile from typing import Optional, Tuple, Dict, Any from urllib.parse import parse_qsl import requests import docker from docker.errors import NotFound, APIError from fastapi import FastAPI, Header, HTTPException, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse, JSONResponse from fastapi.background import BackgroundTasks from dotenv import load_dotenv load_dotenv() # ========================= # CONFIG # ========================= BOT_TOKEN = os.getenv("BOT_TOKEN", "").strip() PUBLIC_IP = os.getenv("PUBLIC_IP", "").strip() DB_PATH = os.getenv("DB_PATH", "mchost.db").strip() DOCKER_IMAGE = os.getenv("DOCKER_IMAGE", "itzg/minecraft-server").strip() ADMIN_IDS = { int(x.strip()) for x in os.getenv("ADMIN_IDS", "").split(",") if x.strip().isdigit() } FREE_MINUTES = int(os.getenv("FREE_MINUTES", "120")) PORT_MIN = int(os.getenv("PORT_MIN", "25570")) PORT_MAX = int(os.getenv("PORT_MAX", "25650")) MEM_LIMIT = os.getenv("MEM_LIMIT", "1g") CPU_LIMIT = float(os.getenv("CPU_LIMIT", "1.0")) MC_VERSIONS = [ v.strip() for v in os.getenv("MC_VERSIONS", "1.21.1,1.20.4,1.19.4,1.18.2,1.16.5").split(",") if v.strip() ] WORLD_DOWNLOAD_PRICE_XTR = int(os.getenv("WORLD_DOWNLOAD_PRICE_XTR", "25")) PRICE_EXTEND_60_XTR = int(os.getenv("PRICE_EXTEND_60_XTR", "25")) # Telegram webhook security (optional but recommended) TELEGRAM_WEBHOOK_SECRET = os.getenv("TELEGRAM_WEBHOOK_SECRET", "").strip() # Docker world folder inside itzg/minecraft-server: # Usually /data/world. If you use a different layout, change here. WORLD_DIR_IN_CONTAINER = os.getenv("WORLD_DIR_IN_CONTAINER", "/data/world").strip() # CORS ALLOW_ORIGINS = os.getenv("ALLOW_ORIGINS", "*").strip().split(",") if not BOT_TOKEN or not PUBLIC_IP: raise RuntimeError("BOT_TOKEN / PUBLIC_IP missing in .env") BOT_API = f"https://api.telegram.org/bot{BOT_TOKEN}" docker_client = docker.from_env() app = FastAPI(title="ServerCraft WebApp API") app.add_middleware( CORSMiddleware, allow_origins=ALLOW_ORIGINS if ALLOW_ORIGINS != ["*"] else ["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # ========================= # Telegram WebApp initData verify # ========================= def verify_telegram_init_data(init_data: str, max_age_seconds: int = 24 * 3600) -> Dict[str, str]: """ Verifies Telegram WebApp initData: secret_key = HMAC_SHA256("WebAppData", bot_token) calculated_hash = HMAC_SHA256(secret_key, data_check_string) """ if not init_data: raise HTTPException(401, "Missing initData") data = dict(parse_qsl(init_data, keep_blank_values=True)) received_hash = data.pop("hash", None) if not received_hash: raise HTTPException(401, "Missing hash") auth_date_raw = data.get("auth_date") if not auth_date_raw or not auth_date_raw.isdigit(): raise HTTPException(401, "Missing auth_date") auth_date = int(auth_date_raw) now = int(time.time()) if now - auth_date > max_age_seconds: raise HTTPException(401, "initData too old") data_check_string = "\n".join([f"{k}={v}" for k, v in sorted(data.items())]) secret_key = hmac.new(b"WebAppData", BOT_TOKEN.encode(), hashlib.sha256).digest() calculated_hash = hmac.new(secret_key, data_check_string.encode(), hashlib.sha256).hexdigest() if not hmac.compare_digest(calculated_hash, received_hash): raise HTTPException(401, "Bad initData hash") return data def parse_user_id_from_init_data(init_data: str) -> int: data = verify_telegram_init_data(init_data) import json user_json = data.get("user", "") if not user_json: raise HTTPException(401, "No user in initData") try: u = json.loads(user_json) return int(u["id"]) except Exception: raise HTTPException(401, "Bad user in initData") def is_admin(user_id: int) -> bool: return user_id in ADMIN_IDS if ADMIN_IDS else False # ========================= # DB helpers (WAL safe) # ========================= def _connect(): con = sqlite3.connect(DB_PATH, timeout=30) con.execute("PRAGMA journal_mode=WAL;") con.execute("PRAGMA synchronous=NORMAL;") con.execute("PRAGMA foreign_keys=ON;") return con def db_init(): with _connect() as con: con.execute(""" CREATE TABLE IF NOT EXISTS users ( user_id INTEGER PRIMARY KEY, first_seen INTEGER NOT NULL )""") con.execute(""" CREATE TABLE IF NOT EXISTS servers ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, container_name TEXT NOT NULL UNIQUE, host_port INTEGER NOT NULL, created_at INTEGER NOT NULL, expires_at INTEGER NOT NULL, status TEXT NOT NULL, mc_version TEXT NOT NULL DEFAULT 'latest' )""") con.execute(""" CREATE TABLE IF NOT EXISTS payments ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, server_id INTEGER NOT NULL, purpose TEXT NOT NULL, payload TEXT NOT NULL UNIQUE, telegram_payment_charge_id TEXT UNIQUE, amount_xtr INTEGER NOT NULL, created_at INTEGER NOT NULL )""") con.execute("CREATE INDEX IF NOT EXISTS idx_servers_user_id ON servers(user_id)") con.execute("CREATE INDEX IF NOT EXISTS idx_payments_payload ON payments(payload)") con.execute("CREATE INDEX IF NOT EXISTS idx_payments_user_server ON payments(user_id, server_id)") def db_register_user(user_id: int): with _connect() as con: con.execute( "INSERT OR IGNORE INTO users(user_id, first_seen) VALUES(?, ?)", (user_id, int(time.time())) ) def db_get_user_server(user_id: int) -> Optional[Tuple]: with _connect() as con: cur = con.execute( "SELECT id, user_id, container_name, host_port, mc_version, created_at, expires_at, status " "FROM servers WHERE user_id=? ORDER BY id DESC LIMIT 1", (user_id,) ) return cur.fetchone() def db_get_server_by_id(server_id: int) -> Optional[Tuple]: with _connect() as con: cur = con.execute( "SELECT id, user_id, container_name, host_port, mc_version, created_at, expires_at, status " "FROM servers WHERE id=?", (server_id,) ) return cur.fetchone() def db_insert_server(user_id: int, container_name: str, host_port: int, mc_version: str, expires_at: int): now = int(time.time()) with _connect() as con: con.execute( "INSERT INTO servers(user_id, container_name, host_port, mc_version, created_at, expires_at, status) " "VALUES(?,?,?,?,?,?,?)", (user_id, container_name, host_port, mc_version, now, expires_at, "running") ) def db_update_status(server_id: int, status: str): with _connect() as con: con.execute("UPDATE servers SET status=? WHERE id=?", (status, server_id)) def db_update_expiry(server_id: int, expires_at: int): with _connect() as con: con.execute("UPDATE servers SET expires_at=? WHERE id=?", (expires_at, server_id)) def db_delete_server(server_id: int): with _connect() as con: con.execute("DELETE FROM servers WHERE id=?", (server_id,)) def db_create_payment(user_id: int, server_id: int, purpose: str, payload: str, amount_xtr: int): now = int(time.time()) with _connect() as con: con.execute( "INSERT INTO payments(user_id, server_id, purpose, payload, telegram_payment_charge_id, amount_xtr, created_at) " "VALUES(?,?,?,?,?,?,?)", (user_id, server_id, purpose, payload, None, int(amount_xtr), now), ) def db_mark_payment_paid(payload: str, telegram_payment_charge_id: str): with _connect() as con: con.execute( "UPDATE payments SET telegram_payment_charge_id=? WHERE payload=?", (telegram_payment_charge_id, payload), ) def db_has_paid(user_id: int, server_id: int, purpose: str) -> bool: with _connect() as con: cur = con.execute( "SELECT 1 FROM payments " "WHERE user_id=? AND server_id=? AND purpose=? " "AND telegram_payment_charge_id IS NOT NULL " "ORDER BY id DESC LIMIT 1", (user_id, server_id, purpose) ) return cur.fetchone() is not None # ========================= # Docker helpers # ========================= def pick_free_port() -> int: return random.randint(PORT_MIN, PORT_MAX) def container_name_for(user_id: int) -> str: return f"mc_user_{user_id}" def docker_remove_if_exists(name: str): try: c = docker_client.containers.get(name) c.remove(force=True) except NotFound: return def docker_status(container_name: str) -> str: try: c = docker_client.containers.get(container_name) c.reload() return c.status except NotFound: return "missing" def docker_start_server(user_id: int, mc_version: str) -> int: name = container_name_for(user_id) docker_remove_if_exists(name) last_err = None for _ in range(25): host_port = pick_free_port() try: docker_client.containers.run( image=DOCKER_IMAGE, name=name, detach=True, environment={ "EULA": "TRUE", "VERSION": mc_version or "latest", "ONLINE_MODE": "TRUE", "DIFFICULTY": "easy", "MAX_PLAYERS": "10", }, ports={"25565/tcp": host_port}, mem_limit=MEM_LIMIT, nano_cpus=int(CPU_LIMIT * 1e9), restart_policy={"Name": "no"}, ) return host_port except APIError as e: last_err = e continue raise RuntimeError(f"Failed to start container. Last error: {last_err}") def docker_stop_server(container_name: str): c = docker_client.containers.get(container_name) c.stop(timeout=10) def docker_start_existing(container_name: str): c = docker_client.containers.get(container_name) c.start() def docker_delete_server(container_name: str): try: c = docker_client.containers.get(container_name) c.remove(force=True) except NotFound: pass def docker_exec(container_name: str, cmd: str): c = docker_client.containers.get(container_name) res = c.exec_run(cmd, stdout=True, stderr=True) if res.exit_code != 0: out = res.output.decode(errors="ignore") if isinstance(res.output, (bytes, bytearray)) else str(res.output) raise RuntimeError(out) return res.output def docker_copy_from(container_name: str, src_path: str, dst_path: str): """ get_archive returns a TAR stream. We write tar, then extract first file to dst_path. """ c = docker_client.containers.get(container_name) stream, _stat = c.get_archive(src_path) tmp_tar = dst_path + ".tar" with open(tmp_tar, "wb") as f: for chunk in stream: f.write(chunk) with tarfile.open(tmp_tar, "r") as tf: members = tf.getmembers() if not members: raise RuntimeError("Empty archive from container") member = members[0] extracted = tf.extractfile(member) if extracted is None: raise RuntimeError("Cannot extract file from tar") with open(dst_path, "wb") as out: out.write(extracted.read()) os.remove(tmp_tar) # ========================= # Helpers # ========================= def fmt_time_left(expires_at: int) -> str: now = int(time.time()) left = max(0, expires_at - now) mins = left // 60 hrs = mins // 60 mins = mins % 60 return f"{hrs}h {mins}m" if hrs > 0 else f"{mins}m" def alive_server_row(row: Tuple) -> bool: expires_at = int(row[6]) status = row[7] return int(time.time()) < expires_at and status in ("running", "stopped") # ========================= # Telegram Stars invoices # ========================= def tg_create_invoice_link(title: str, description: str, payload: str, amount_xtr: int) -> str: body = { "title": title, "description": description, "payload": payload, "currency": "XTR", "prices": [{"label": title, "amount": int(amount_xtr)}], "provider_token": "", } r = requests.post(f"{BOT_API}/createInvoiceLink", json=body, timeout=15) if not r.ok: raise RuntimeError(f"Telegram createInvoiceLink failed: {r.text}") data = r.json() if not data.get("ok"): raise RuntimeError(f"Telegram createInvoiceLink not ok: {data}") return data["result"] def tg_answer_pre_checkout(pre_checkout_query_id: str, ok: bool, error_message: str = ""): payload = {"pre_checkout_query_id": pre_checkout_query_id, "ok": ok} if not ok and error_message: payload["error_message"] = error_message r = requests.post(f"{BOT_API}/answerPreCheckoutQuery", json=payload, timeout=15) # don't hard fail webhook if Telegram transiently errors return r.ok # ========================= # Startup # ========================= @app.on_event("startup") def _startup(): db_init() # ========================= # Public config # ========================= @app.get("/api/config") def api_config(): return { "free_minutes": FREE_MINUTES, "public_ip": PUBLIC_IP, "mc_versions": MC_VERSIONS, "price_extend_60_xtr": PRICE_EXTEND_60_XTR, "price_world_xtr": WORLD_DOWNLOAD_PRICE_XTR, } # ========================= # "Me" - user + server # ========================= @app.get("/api/me") def api_me(x_init_data: str = Header(default="")): user_id = parse_user_id_from_init_data(x_init_data) db_register_user(user_id) s = db_get_user_server(user_id) resp: Dict[str, Any] = {"user_id": user_id, "is_admin": is_admin(user_id), "server": None} if s and alive_server_row(s): server_id, _, cname, port, mc_version, _created_at, expires_at, status = s resp["server"] = { "id": server_id, "addr": f"{PUBLIC_IP}:{port}", "mc_version": mc_version, "status_db": status, "docker": docker_status(cname), "expires_at": expires_at, "time_left": fmt_time_left(expires_at), } return resp # ========================= # Server lifecycle endpoints # ========================= @app.post("/api/server/create") async def api_create_server(payload: Dict[str, Any], x_init_data: str = Header(default="")): user_id = parse_user_id_from_init_data(x_init_data) db_register_user(user_id) mc_version = (payload.get("mc_version") or "").strip() if mc_version not in MC_VERSIONS: raise HTTPException(400, "Bad mc_version") existing = db_get_user_server(user_id) if existing and alive_server_row(existing): raise HTTPException(409, "Server already exists") try: host_port = await asyncio.to_thread(docker_start_server, user_id, mc_version) except Exception as e: raise HTTPException(500, f"Failed to create server: {e}") expires_at = int(time.time()) + FREE_MINUTES * 60 cname = container_name_for(user_id) db_insert_server(user_id, cname, host_port, mc_version, expires_at) return { "ok": True, "server": { "addr": f"{PUBLIC_IP}:{host_port}", "mc_version": mc_version, "docker": docker_status(cname), "expires_at": expires_at, "time_left": fmt_time_left(expires_at), }, } @app.post("/api/server/stop") async def api_stop_server(payload: Dict[str, Any], x_init_data: str = Header(default="")): user_id = parse_user_id_from_init_data(x_init_data) db_register_user(user_id) server_id = int(payload.get("server_id") or 0) s = db_get_server_by_id(server_id) if not s or s[1] != user_id: raise HTTPException(404, "Server not found") _, _, cname, *_ = s try: await asyncio.to_thread(docker_stop_server, cname) db_update_status(server_id, "stopped") return {"ok": True} except Exception as e: raise HTTPException(500, f"Stop error: {e}") @app.post("/api/server/start") async def api_start_server(payload: Dict[str, Any], x_init_data: str = Header(default="")): user_id = parse_user_id_from_init_data(x_init_data) db_register_user(user_id) server_id = int(payload.get("server_id") or 0) s = db_get_server_by_id(server_id) if not s or s[1] != user_id: raise HTTPException(404, "Server not found") if not alive_server_row(s): raise HTTPException(400, "Server expired") _, _, cname, *_ = s try: await asyncio.to_thread(docker_start_existing, cname) db_update_status(server_id, "running") return {"ok": True} except Exception as e: raise HTTPException(500, f"Start error: {e}") @app.post("/api/server/delete") async def api_delete_server(payload: Dict[str, Any], x_init_data: str = Header(default="")): user_id = parse_user_id_from_init_data(x_init_data) db_register_user(user_id) server_id = int(payload.get("server_id") or 0) s = db_get_server_by_id(server_id) if not s or s[1] != user_id: raise HTTPException(404, "Server not found") _, _, cname, *_ = s await asyncio.to_thread(docker_delete_server, cname) db_delete_server(server_id) return {"ok": True} @app.post("/api/server/status") async def api_status(payload: Dict[str, Any], x_init_data: str = Header(default="")): user_id = parse_user_id_from_init_data(x_init_data) db_register_user(user_id) server_id = int(payload.get("server_id") or 0) s = db_get_server_by_id(server_id) if not s or s[1] != user_id: raise HTTPException(404, "Server not found") _, _, cname, port, mc_version, _created_at, expires_at, status = s return { "id": server_id, "addr": f"{PUBLIC_IP}:{port}", "mc_version": mc_version, "status_db": status, "docker": docker_status(cname), "expires_at": expires_at, "time_left": fmt_time_left(expires_at), } # ========================= # Payments endpoints (create invoice links) # ========================= @app.post("/api/pay/extend60") def api_pay_extend60(payload: Dict[str, Any], x_init_data: str = Header(default="")): user_id = parse_user_id_from_init_data(x_init_data) db_register_user(user_id) server_id = int(payload.get("server_id") or 0) s = db_get_server_by_id(server_id) if not s or s[1] != user_id: raise HTTPException(404, "Server not found") if not alive_server_row(s): raise HTTPException(400, "Server expired") inv_payload = f"extend:{server_id}:{user_id}:60:{int(time.time())}:{random.randint(1000,9999)}" try: db_create_payment(user_id, server_id, "extend", inv_payload, PRICE_EXTEND_60_XTR) except Exception: inv_payload = f"extend:{server_id}:{user_id}:60:{int(time.time())}:{random.randint(10000,99999)}" db_create_payment(user_id, server_id, "extend", inv_payload, PRICE_EXTEND_60_XTR) link = tg_create_invoice_link( title="Extend server +60 min", description="Adds 60 minutes to your Minecraft server.", payload=inv_payload, amount_xtr=PRICE_EXTEND_60_XTR ) return {"ok": True, "invoice_link": link} @app.post("/api/pay/world") def api_pay_world(payload: Dict[str, Any], x_init_data: str = Header(default="")): user_id = parse_user_id_from_init_data(x_init_data) db_register_user(user_id) server_id = int(payload.get("server_id") or 0) s = db_get_server_by_id(server_id) if not s or s[1] != user_id: raise HTTPException(404, "Server not found") if not alive_server_row(s): raise HTTPException(400, "Server expired") inv_payload = f"world:{server_id}:{user_id}:{int(time.time())}:{random.randint(1000,9999)}" try: db_create_payment(user_id, server_id, "world_export", inv_payload, WORLD_DOWNLOAD_PRICE_XTR) except Exception: inv_payload = f"world:{server_id}:{user_id}:{int(time.time())}:{random.randint(10000,99999)}" db_create_payment(user_id, server_id, "world_export", inv_payload, WORLD_DOWNLOAD_PRICE_XTR) link = tg_create_invoice_link( title="Download world", description="Creates an archive of your server world for download.", payload=inv_payload, amount_xtr=WORLD_DOWNLOAD_PRICE_XTR ) return {"ok": True, "invoice_link": link} # ========================= # Telegram webhook (Stars payments) # ========================= @app.post("/api/telegram/webhook") async def telegram_webhook( req: Request, x_telegram_bot_api_secret_token: str = Header(default="") ): # Optional security: set secret_token when setting webhook if TELEGRAM_WEBHOOK_SECRET: if x_telegram_bot_api_secret_token != TELEGRAM_WEBHOOK_SECRET: raise HTTPException(401, "Bad webhook secret") update = await req.json() # 1) pre_checkout_query -> must answer if "pre_checkout_query" in update: q = update["pre_checkout_query"] ok = tg_answer_pre_checkout(q["id"], ok=True) return {"ok": ok} # 2) successful_payment in message msg = update.get("message") or update.get("edited_message") if msg and msg.get("successful_payment"): sp = msg["successful_payment"] payload = sp.get("invoice_payload", "") telegram_charge_id = sp.get("telegram_payment_charge_id", "") if payload and telegram_charge_id: try: db_mark_payment_paid(payload, telegram_charge_id) except Exception: # do not fail webhook hard pass # Apply effects immediately (extend) if desired: # payload formats: # extend:::60:: # world:::: try: if payload.startswith("extend:"): parts = payload.split(":") server_id = int(parts[1]) minutes = int(parts[3]) if len(parts) > 3 and parts[3].isdigit() else 60 s = db_get_server_by_id(server_id) if s: expires_at = int(s[6]) now = int(time.time()) # if already expired, start from now base = expires_at if expires_at > now else now db_update_expiry(server_id, base + minutes * 60) except Exception: pass return {"ok": True} return {"ok": True} # ========================= # World download after payment # ========================= @app.get("/api/world/download") async def api_world_download( server_id: int, bg: BackgroundTasks, x_init_data: str = Header(default="") ): user_id = parse_user_id_from_init_data(x_init_data) db_register_user(user_id) s = db_get_server_by_id(int(server_id)) if not s or s[1] != user_id: raise HTTPException(404, "Server not found") if not db_has_paid(user_id, int(server_id), "world_export"): raise HTTPException(402, "World download not paid") _, _, cname, _port, _mcver, _created_at, _expires_at, _status = s if docker_status(cname) == "missing": raise HTTPException(400, "Container missing") # Create archive inside container # Archive includes world directory; you can extend to include server.properties etc. try: world_dir = WORLD_DIR_IN_CONTAINER.rstrip("/") # Create /tmp/world.tar.gz with the world folder cmd = f"sh -c 'cd {os.path.dirname(world_dir)} && tar -czf /tmp/world.tar.gz {os.path.basename(world_dir)}'" await asyncio.to_thread(docker_exec, cname, cmd) except Exception as e: raise HTTPException(500, f"Failed to archive world: {e}") tmp_dir = tempfile.mkdtemp(prefix="world_") out_path = os.path.join(tmp_dir, f"world_{server_id}.tar.gz") try: await asyncio.to_thread(docker_copy_from, cname, "/tmp/world.tar.gz", out_path) except Exception as e: raise HTTPException(500, f"Failed to copy archive: {e}") def cleanup(): try: os.remove(out_path) except Exception: pass try: os.rmdir(tmp_dir) except Exception: pass bg.add_task(cleanup) return FileResponse( out_path, media_type="application/gzip", filename=f"world_{server_id}.tar.gz" )