Telegram bot for tiddlywiki

Ah ha! I hadn’t realized it was this simple. Thank you – this works!

1 Like

So in case this is helpful for anyone, here’s the code that allows me to telegram my bot something like /editfile ATiddlerToEdit ADDED_TEXT! and it will add “ADDED_TEXT!” to ATiddlerToEdit. The extra setup required is to create your telegram bot and create a .env file in the same directory with a telegram token for your bot and a github token for the repo where you’re storing the tiddlywiki.

My plan is to change this so I have specific bot commands for adding text to specific tiddlers. But I thought I’d upload the code at this stage in case anyone wants to make their own bot with something more like this workflow. Feel free to let me know what looks particularly clunky in this code – I’m always hoping to learn to be a better programmer!


import os
import requests
import re
import json
from dotenv import load_dotenv
import base64
from telegram import Update
from telegram.ext import Application, CommandHandler, CallbackContext

load_dotenv(dotenv_path="credentials.env") 


# Loading environment variables
TELEGRAM_TOKEN = os.getenv('TELEGRAM_TOKEN')
GITHUB_TOKEN = os.getenv('GITHUB_TOKEN')
REPO_OWNER = os.getenv('REPO_OWNER')
REPO_NAME = os.getenv('REPO_NAME')
FILE_PATH = os.getenv('FILE_PATH')


# Helper function to modify HTML content
def modify_text_in_html(content, title, new_text):
    """
    This function looks for the JSON object in the <script> tag in the HTML content,
    searches for the `title`, and appends `new_text` to the `text` field of that object.
    """

    # Define the pattern to locate the <script> tag containing JSON
    script_pattern = re.compile(r'(<script class="tiddlywiki-tiddler-store" type="application/json">)(.*?)(</script>)', re.DOTALL)
    
    # Search for the script tag that contains the JSON array
    match = script_pattern.search(content)
    
    if not match:
        return None  # Return None if no matching script tag is found

    # Extract the JSON string from within the <script> tag
    json_str = match.group(2)

    # Load the JSON content 
    try:
        tiddlers = json.loads(json_str)
    except json.JSONDecodeError as e:
        print(f"Error decoding JSON: {e}")
        return None

    # Find the tiddler with the matching title
    modified = False
    for tiddler in tiddlers:
        if tiddler.get("title") == title:
            # Append the new text to the "text" field
            tiddler["text"] += f" {new_text}"
            modified = True
            break

    if not modified:
        return None  # Return None if no matching title is found

    # Convert the modified tiddlers back to a JSON string
    updated_json_str = json.dumps(tiddlers, indent=2)
    updated_json_str = updated_json_str.replace("<", "\\u003C")

    # Reinsert the modified JSON back into the <script> tag
    modified_content = content.replace(json_str, updated_json_str)

    return modified_content

async def edit_file(update: Update, context: CallbackContext) -> None:
    try:
        # Extract the title and new text from user input
        if len(context.args) < 2:
            await update.message.reply_text('Usage: /editfile <title> <new_text>')
            return
        
        title = context.args[0]
        new_text = ' '.join(context.args[1:])
        
        # GitHub API request to get the file content metadata
        file_url = f'https://api.github.com/repos/{REPO_OWNER}/{REPO_NAME}/contents/{FILE_PATH}'
        headers = {
            'Authorization': f'token {GITHUB_TOKEN}',
            'Accept': 'application/vnd.github.v3+json'
        }
        response = requests.get(file_url, headers=headers)

        # Check for errors in the response
        if response.status_code != 200:
            print(f"Error {response.status_code}: {response.text}")
            await update.message.reply_text(f"Failed to fetch the file metadata: {response.text}")
            return

        file_data = response.json()

        # File is too large; we need to fetch it via the Blob API using the git_url
        git_url = file_data.get('git_url')
        if not git_url:
            await update.message.reply_text("Unable to retrieve git_url for the file.")
            return

        # Fetch the content from the GitHub Blob API
        blob_response = requests.get(git_url, headers=headers)
        
        # Check if the blob API call is successful
        if blob_response.status_code != 200:
            await update.message.reply_text(f"Failed to fetch the file blob: {blob_response.text}")
            return
        
        blob_data = blob_response.json()
        file_content_base64 = blob_data.get('content')

        # Decode the base64 content (HTML content in this case)
        file_content = base64.b64decode(file_content_base64).decode('utf-8')

        # Modify the HTML content
        updated_content = modify_text_in_html(file_content, title, new_text)
        if updated_content is None:
            await update.message.reply_text(f'Title "{title}" not found in the file.')
            return

        # Encode the updated content back to Base64
        encoded_content = base64.b64encode(updated_content.encode('utf-8')).decode('utf-8')

        # Prepare the data to update the file on GitHub
        sha = file_data['sha']  # Get the current SHA of the file
        data = {
            'message': f'Updated HTML content for title "{title}" via Telegram bot',
            'content': encoded_content,
            'sha': sha  # Include the file's SHA to make the update
        }

        # GitHub API request to update the file
        update_response = requests.put(file_url, headers=headers, data=json.dumps(data, ensure_ascii=True))
        if update_response.status_code == 200:
            await update.message.reply_text(f'HTML content for "{title}" updated successfully!')
        else:
            await update.message.reply_text(f'Failed to update the file: {update_response.json().get("message")}')
    
    except Exception as e:
        await update.message.reply_text(f'An error occurred: {str(e)}')

def main() -> None:
    # Initialize the bot application
    application = Application.builder().token(TELEGRAM_TOKEN).build()
    
    # Add a command handler
    application.add_handler(CommandHandler('editfile', edit_file))

    # Start the bot
    application.run_polling()

if __name__ == '__main__':
    main()
1 Like

And here’s the version where I telegram “b hello world” and it will add “hello world” to the tiddler “b_edit”. In this version of the code, I’ve hardcoded in “a” to edit “a_edit” and “b” to edit “b_edit”.

import os
import requests
import re
import json
from dotenv import load_dotenv
import base64
from telegram import Update
from telegram.ext import Application, ContextTypes, filters, MessageHandler, CommandHandler, CallbackContext

load_dotenv(dotenv_path="credentials.env") 


# Loading environment variables
TELEGRAM_TOKEN = os.getenv('TELEGRAM_TOKEN')
GITHUB_TOKEN = os.getenv('GITHUB_TOKEN')
REPO_OWNER = os.getenv('REPO_OWNER')
REPO_NAME = os.getenv('REPO_NAME')
FILE_PATH = os.getenv('FILE_PATH')


# Helper function to modify HTML content
def modify_text_in_html(content, title, new_text):
    """
    This function looks for the JSON object in the <script> tag in the HTML content,
    searches for the `title`, and appends `new_text` to the `text` field of that object.
    """

    # Define the pattern to locate the <script> tag containing JSON
    script_pattern = re.compile(r'(<script class="tiddlywiki-tiddler-store" type="application/json">)(.*?)(</script>)', re.DOTALL)
    
    # Search for the script tag that contains the JSON array
    match = script_pattern.search(content)
    
    if not match:
        return None  # Return None if no matching script tag is found

    # Extract the JSON string from within the <script> tag
    json_str = match.group(2)

    # Load the JSON content 
    try:
        tiddlers = json.loads(json_str)
    except json.JSONDecodeError as e:
        print(f"Error decoding JSON: {e}")
        return None

    # Find the tiddler with the matching title
    modified = False
    for tiddler in tiddlers:
        if tiddler.get("title") == title:
            # Append the new text to the "text" field
            tiddler["text"] += f" {new_text}"
            modified = True
            break

    if not modified:
        return None  # Return None if no matching title is found

    # Convert the modified tiddlers back to a JSON string
    updated_json_str = json.dumps(tiddlers, indent=2)
    updated_json_str = updated_json_str.replace("<", "\\u003C")

    # Reinsert the modified JSON back into the <script> tag
    modified_content = content.replace(json_str, updated_json_str)

    return modified_content


async def edit_file(update, title, new_text) -> None:
    try:

        # GitHub API request to get the file content metadata
        file_url = f'https://api.github.com/repos/{REPO_OWNER}/{REPO_NAME}/contents/{FILE_PATH}'
        headers = {
            'Authorization': f'token {GITHUB_TOKEN}',
            'Accept': 'application/vnd.github.v3+json'
        }
        response = requests.get(file_url, headers=headers)

        # Check for errors in the response
        if response.status_code != 200:
            print(f"Error {response.status_code}: {response.text}")
            await update.message.reply_text(f"Failed to fetch the file metadata: {response.text}")
            return

        file_data = response.json()

        # File is too large; we need to fetch it via the Blob API using the git_url
        git_url = file_data.get('git_url')
        if not git_url:
            await update.message.reply_text("Unable to retrieve git_url for the file.")
            return

        # Fetch the content from the GitHub Blob API
        blob_response = requests.get(git_url, headers=headers)
        
        # Check if the blob API call is successful
        if blob_response.status_code != 200:
            await update.message.reply_text(f"Failed to fetch the file blob: {blob_response.text}")
            return
        
        blob_data = blob_response.json()
        file_content_base64 = blob_data.get('content')

        # Decode the base64 content (HTML content in this case)
        file_content = base64.b64decode(file_content_base64).decode('utf-8')

        # Modify the HTML content
        updated_content = modify_text_in_html(file_content, title, new_text)
        if updated_content is None:
            await update.message.reply_text(f'Title "{title}" not found in the file.')
            return

        # Encode the updated content back to Base64
        encoded_content = base64.b64encode(updated_content.encode('utf-8')).decode('utf-8')

        # Prepare the data to update the file on GitHub
        sha = file_data['sha']  # Get the current SHA of the file
        data = {
            'message': f'Updated HTML content for title "{title}" via Telegram bot',
            'content': encoded_content,
            'sha': sha  # Include the file's SHA to make the update
        }

        # GitHub API request to update the file
        update_response = requests.put(file_url, headers=headers, data=json.dumps(data, ensure_ascii=True))
        if update_response.status_code == 200:
            await update.message.reply_text(f'HTML content for "{title}" updated successfully!')
        else:
            await update.message.reply_text(f'Failed to update the file: {update_response.json().get("message")}')
    
    except Exception as e:
        await update.message.reply_text(f'An error occurred: {str(e)}')

async def edit_a_edit(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
    title = "a_edit"
    new_text_with_a = update.message.text
    new_text = new_text_with_a[2:]
    await edit_file(update, title, new_text)

async def edit_a_edit(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
    title = "b_edit"
    new_text_with_b = update.message.text
    new_text = new_text_with_b[2:]
    await edit_file(update, title, new_text)

def main() -> None:
    # Initialize the bot application
    application = Application.builder().token(TELEGRAM_TOKEN).build()
    
    # Add a command handler

    application.add_handler(MessageHandler(filters.Regex('(?i)^a .*'), edit_a_edit))
    application.add_handler(MessageHandler(filters.Regex('(?i)^b .*'), edit_a_edit))


    # Start the bot
    application.run_polling()

if __name__ == '__main__':
    main()

I checked wayback machine as a hailmary to see if any docs or files exist for this and no dice. I think this gem is lost to time sadly

1 Like

And here’s a version of the code that works with a tiddlywiki index.html saved on google drive. All you have to do is create a google cloud service account with access to your drive, share the index.html with it, and download the credentials to credentials.json stored in the same directory.

import logging
from telegram import Update
from telegram.ext import Application, ContextTypes, filters, MessageHandler, CommandHandler
from google.oauth2.service_account import Credentials
from googleapiclient.discovery import build
from googleapiclient.http import MediaIoBaseDownload, MediaFileUpload
import io
import os
import json
import re
from dotenv import load_dotenv

load_dotenv(dotenv_path="credentials.env")

SCOPES = ['https://www.googleapis.com/auth/drive']

TELEGRAM_TOKEN = os.getenv('TELEGRAM_TOKEN')
FILE_ID = os.getenv('FILE_ID')

def get_drive_service():
    # Load the service account credentials from the JSON key file
    creds = Credentials.from_service_account_file('credentials.json', scopes=SCOPES)
    service = build('drive', 'v3', credentials=creds)
    return service

async def get_file_content(file_id):
    service = get_drive_service()
    request = service.files().get_media(fileId=file_id)
    fh = io.BytesIO()  # File-like object for reading in memory
    downloader = MediaIoBaseDownload(fh, request)
    
    done = False
    while not done:
        status, done = downloader.next_chunk()
    
    # After download, get the content
    fh.seek(0)
    content = fh.read().decode('utf-8')  # Assuming the file is a text file
    return content  # Return the entire content

async def get_file(update: Update):
    try:
        file_id = FILE_ID  # Get the file ID from the command
        content = await get_file_content(file_id)
        await update.message.reply_text(f"First 100 characters: {content[:100]}")
        return content
    except IndexError:
        await update.message.reply_text("Please provide a file ID.")
    except Exception as e:
        await update.message.reply_text(f"Error: {e}")

async def modify_text_in_html(update, title, new_text):
    """
    This function looks for the JSON object in the <script> tag in the HTML content,
    searches for the `title`, and appends `new_text` to the `text` field of that object.
    """

    content = await get_file_content(FILE_ID)  # Await the coroutine

    # Define the pattern to locate the <script> tag containing JSON
    script_pattern = re.compile(r'(<script class="tiddlywiki-tiddler-store" type="application/json">)(.*?)(</script>)', re.DOTALL)
    
    # Search for the script tag that contains the JSON array
    match = script_pattern.search(content)
    
    if not match:
        await update.message.reply_text(f"Title '{title}' not found in the file.")
        return None  # Return None if no matching script tag is found

    # Extract the JSON string from within the <script> tag
    json_str = match.group(2)

    # Load the JSON content 
    try:
        tiddlers = json.loads(json_str)
    except json.JSONDecodeError as e:
        print(f"Error decoding JSON: {e}")
        return None

    # Find the tiddler with the matching title
    modified = False
    for tiddler in tiddlers:
        if tiddler.get("title") == title:
            # Append the new text to the "text" field
            tiddler["text"] += f" {new_text}"
            modified = True
            break

    if not modified:
        await update.message.reply_text(f'Title "{title}" not found in the file.')
        return None  # Return None if no matching title is found

    # Convert the modified tiddlers back to a JSON string
    updated_json_str = json.dumps(tiddlers, indent=2)
    updated_json_str = updated_json_str.replace("<", "\\u003C")

    # Reinsert the modified JSON back into the <script> tag
    modified_content = content.replace(json_str, updated_json_str)

    overwrite_drive_file(modified_content)
    await update.message.reply_text(f"Title '{title}' has been updated successfully.")

def overwrite_drive_file(modified_text):
    # Write the modified text to a temporary file
    service = get_drive_service()
    temp_file_path = 'temp_modified_file.html'
    with open(temp_file_path, 'w') as temp_file:
        temp_file.write(modified_text)
    
    # Create a MediaFileUpload object for the new content
    media = MediaFileUpload(temp_file_path, mimetype='text/html')

    # Use the Google Drive API to update the file content
    updated_file = service.files().update(
        fileId=FILE_ID,
        media_body=media
    ).execute()

    # Optionally, clean up the temporary file after upload
    os.remove(temp_file_path)

    return updated_file

async def edit_a_edit(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
    title = "a_edit"
    new_text_with_a = update.message.text
    new_text = new_text_with_a[2:]
    await modify_text_in_html(update, title, new_text)

async def edit_b_edit(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
    title = "b_edit"
    new_text_with_b = update.message.text
    new_text = new_text_with_b[2:]
    await modify_text_in_html(update, title, new_text)

def main():
    # Set up logging
    logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO)
    
    # Create the application and pass your bot's token.
    application = Application.builder().token(TELEGRAM_TOKEN).build()
    
    # Add handler for the /getfile command
    application.add_handler(MessageHandler(filters.Regex('(?i)^a .*'), edit_a_edit))
    application.add_handler(MessageHandler(filters.Regex('(?i)^b .*'), edit_b_edit))
    application.add_handler(CommandHandler("getfile", get_file))
    
    # Start the bot
    application.run_polling()

if __name__ == '__main__':
    main()

This is all I personally need for this project, but I’d certainly be interested for the sake of fun and curiosity in anything more general you were thinking of, @paotsaq (or anyone else), that would work across saving methods now that I have two very specific programs for two very specific saving methods.

2 Likes

Can something be done for wiki saved in onedrive?

Looks like it! See if you have luck following these chatgpt instructions: https://chatgpt.com/share/94a05ee5-7848-424b-bb50-cc5cf2df4192 for the read and write parts of the code I shared. The rest of the code (the part that’s the same between the github and google drive programs) should be the same for yours too. Let me know if you have any questions about the setup for my code (like the telegram bot or anything). :slight_smile:

Will check when I am back home on my desktop.

I guess it will take some time for me to understand the code. Since its not an urgent matter currently, I will look into it on a later date. Thanks for sharing the code @noa

Sure! And happy to help with any questions whenever you’d like!

import logging
from telegram import Update
from telegram.ext import Application, ContextTypes, filters, MessageHandler, CommandHandler
import onedrivesdk
import io
import os
import json
import re
from dotenv import load_dotenv

load_dotenv(dotenv_path="credentials.env")

# OneDrive API settings
REDIRECT_URI = 'http://localhost:8080/'
CLIENT_SECRET = os.getenv('CLIENT_SECRET')
CLIENT_ID = os.getenv('CLIENT_ID')
SCOPES = ['wl.signin', 'wl.offline_access', 'onedrive.readwrite']

TELEGRAM_TOKEN = os.getenv('TELEGRAM_TOKEN')
FILE_ID = os.getenv('FILE_ID')

def get_onedrive_client():
    http_provider = onedrivesdk.HttpProvider()
    auth_provider = onedrivesdk.AuthProvider(http_provider, CLIENT_ID, SCOPES)
    client = onedrivesdk.OneDriveClient('https://api.onedrive.com/v1.0/', auth_provider, http_provider)
    auth_url = client.auth_provider.get_auth_url(REDIRECT_URI)
    print('Paste this URL into your browser, approve the app\'s access.')
    print('Copy everything in the address bar after "code=", and paste it below.')
    print(auth_url)
    code = input('Paste code here: ')
    client.auth_provider.authenticate(code, REDIRECT_URI, CLIENT_SECRET)
    return client

async def get_file_content(file_id):
    client = get_onedrive_client()
    item = client.item(drive='me', id=file_id).get()
    content = client.item(drive='me', id=file_id).content.request().get().content
    return content.decode('utf-8')  # Assuming the file is a text file

async def get_file(update: Update):
    try:
        file_id = FILE_ID  # Get the file ID from the command
        content = await get_file_content(file_id)
        await update.message.reply_text(f"First 100 characters: {content[:100]}")
        return content
    except IndexError:
        await update.message.reply_text("Please provide a file ID.")
    except Exception as e:
        await update.message.reply_text(f"Error: {e}")

async def modify_text_in_html(update, title, new_text):
    content = await get_file_content(FILE_ID)  # Await the coroutine

    # Define the pattern to locate the <script> tag containing JSON
    script_pattern = re.compile(r'(<script class="tiddlywiki-tiddler-store" type="application/json">)(.*?)(</script>)', re.DOTALL)
    
    # Search for the script tag that contains the JSON array
    match = script_pattern.search(content)
    
    if not match:
        await update.message.reply_text(f"Title '{title}' not found in the file.")
        return None  # Return None if no matching script tag is found

    # Extract the JSON string from within the <script> tag
    json_str = match.group(2)

    # Load the JSON content 
    try:
        tiddlers = json.loads(json_str)
    except json.JSONDecodeError as e:
        print(f"Error decoding JSON: {e}")
        return None

    # Find the tiddler with the matching title
    modified = False
    for tiddler in tiddlers:
        if tiddler.get("title") == title:
            # Append the new text to the "text" field
            tiddler["text"] += f" {new_text}"
            modified = True
            break

    if not modified:
        await update.message.reply_text(f'Title "{title}" not found in the file.')
        return None  # Return None if no matching title is found

    # Convert the modified tiddlers back to a JSON string
    updated_json_str = json.dumps(tiddlers, indent=2)
    updated_json_str = updated_json_str.replace("<", "\\u003C")

    # Reinsert the modified JSON back into the <script> tag
    modified_content = content.replace(json_str, updated_json_str)

    overwrite_onedrive_file(modified_content)
    await update.message.reply_text(f"Title '{title}' has been updated successfully.")

def overwrite_onedrive_file(modified_text):
    client = get_onedrive_client()
    temp_file_path = 'temp_modified_file.html'
    with open(temp_file_path, 'w') as temp_file:
        temp_file.write(modified_text)
    
    # Upload the modified file back to OneDrive
    client.item(drive='me', id=FILE_ID).content.request().put(open(temp_file_path, 'rb'))

async def edit_a_edit(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
    title = "a_edit"
    new_text_with_a = update.message.text
    new_text = new_text_with_a[2:]
    await modify_text_in_html(update, title, new_text)

async def edit_b_edit(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
    title = "b_edit"
    new_text_with_b = update.message.text
    new_text = new_text_with_b[2:]
    await modify_text_in_html(update, title, new_text)

def main():
    # Set up logging
    logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO)
    
    # Create the application and pass your bot's token.
    application = Application.builder().token(TELEGRAM_TOKEN).build()
    
    # Add handler for the /getfile command
    application.add_handler(MessageHandler(filters.Regex('(?i)^a .*'), edit_a_edit))
    application.add_handler(MessageHandler(filters.Regex('(?i)^b .*'), edit_b_edit))
    application.add_handler(CommandHandler("getfile", get_file))
    
    # Start the bot
    application.run_polling()

if __name__ == '__main__':
    main()

I took the help to AI (windows copilot) to convert your code for use with onedrive. I have zero knowledge in python. So I had to try this option. Will take some time to test it out.