-
ts
@RocketBlaster05 If you still require the youtube_dl module, you can do it as suggested here or alternatively a small edit I did from his script years ago (in order to get it to work, also, let me know if this code should be removed from this post). Every other week or so, you can backup then download to keep it updated
# coding: utf-8 # author: Shaun Hevey # youtube-dl downloader is used to download youtube_dl and patch it to work in Pythonista. # Replace function came from http://stackoverflow.com/questions/39086/search-and-replace-a-line-in-a-file-in-python # Download file was adapted from Python file downloader (https://gist.github.com/89edf288a15fde45682a) import console import os import requests import shutil import tempfile import time import ui import zipfile youtubedl_dir = 'youtube_dl' youtubedl_location = './site-packages/' backup_location = './backup/youtube_dl/' youtubedl_downloadurl = 'https://yt-dl.org/downloads/latest/youtube-dl' youtubedl_unarchive_location = './temp/' files_to_change = [('utils.py','import ctypes','#import ctypes'), ('utils.py','import pipes','#import pipes'), ('YoutubeDL.py','self._err_file.isatty() and ',''), ('downloader/common.py','(\'\\r\\x1b[K\' if sys.stderr.isatty() else \'\\r\')','\'\\r\''), ('extractor/common.py',' and sys.stderr.isatty()','')] def backup_youtubedl(sender): console.show_activity('Checking for youtube-dl') if os.path.isdir(youtubedl_location+youtubedl_dir): console.show_activity('Backing up youtube-dl') if not os.path.exists(backup_location): os.makedirs(backup_location) shutil.move(youtubedl_location+youtubedl_dir,backup_location+youtubedl_dir+ time.strftime('%Y%m%d%H%M%S')) console.hide_activity() @ui.in_background def restore_youtubedl_backup(sender): if not os.path.isdir(backup_location) or not os.listdir(backup_location): console.alert('Nothing to do', 'No backups found to restore') else: folders = os.listdir(backup_location) folder = folders[len(folders)-1] shutil.move(backup_location+folder,youtubedl_location+youtubedl_dir) console.alert('Success','Successfully restored '+folder) def downloadfile(url): localFilename = url.split('/')[-1] or 'download' with open(localFilename, 'wb') as f: r = requests.get(url, stream=True) total_length = r.headers.get('content-length') if total_length: dl = 0 total_length = float(total_length) for chunk in r.iter_content(1024): dl += len(chunk) f.write(chunk) #.setprogress(dl/total_length*100.0) else: f.write(r.content) return localFilename def process_file(path): os.mkdir(youtubedl_unarchive_location) if zipfile.is_zipfile(path): zipfile.ZipFile(path).extractall(path=youtubedl_unarchive_location) @ui.in_background def update_youtubedl(sender): if os.path.exists(youtubedl_location+youtubedl_dir): msg = 'Are you sure you want to update youtubedl exists in site-packages and will be overwritten' if not console.alert('Continue',msg,'OK'): return console.show_activity('Downloading') file = downloadfile(youtubedl_downloadurl) console.show_activity('Extracting') process_file(file) console.show_activity('Moving') if os.path.exists(youtubedl_location+youtubedl_dir): shutil.rmtree(youtubedl_location+youtubedl_dir) shutil.move(youtubedl_unarchive_location+youtubedl_dir, youtubedl_location) console.show_activity('Cleaning Up Download Files') shutil.rmtree(youtubedl_unarchive_location) os.remove(file) console.show_activity('Making youtube-dl friendly') process_youtubedl_for_pythonista() console.hide_activity() def process_youtubedl_for_pythonista(): for patch in files_to_change: filename, old_str, new_str = patch replace_in_file(youtubedl_location+youtubedl_dir+'/'+filename, old_str, new_str) def replace_in_file(file_path, old_str, new_str): with open(file_path, encoding='utf-8') as old_file: #Create temp file fh, abs_path = tempfile.mkstemp() os.close(fh) # close low level and reopen high level with open(abs_path,'w', encoding='utf-8') as new_file: for line in old_file: new_file.write(line.replace(old_str, new_str)) #Remove original file os.remove(file_path) #Move new file shutil.move(abs_path, file_path) def make_button(title, action): button = ui.Button(title=title) button.action = action button.background_color ='lightgrey' button.border_color = 'black' button.border_width = 1 button.flex = 'WB' return button view = ui.View(frame=(0,0,172,132)) view.background_color = 'white' backup_button = make_button(title='Backup YoutubeDL', action=backup_youtubedl) backup_button.center = (view.width * 0.5, view.y+backup_button.height) view.add_subview(backup_button) restore_button = make_button(title='Restore YoutubeDL', action=restore_youtubedl_backup) restore_button.center = (view.width * 0.5, backup_button.y+restore_button.height*1.75) view.add_subview(restore_button) download_button = make_button(title='Download YoutubeDL', action=update_youtubedl) download_button.center = (view.width * 0.5, restore_button.y+download_button.height*1.75) view.add_subview(download_button) view.present('sheet')
-
ts
This seems about right. Instead of swapping out via same property, I just handle the item (one at a time) using a queue
import objc_util import time class NotificationObserver: def __init__(self, name): self.name = objc_util.ns(name) self.center = objc_util.ObjCClass("NSNotificationCenter").defaultCenter() self._observer = None self._blk = None self.queue = objc_util.ObjCClass('NSOperationQueue').new() self.queue.setName_(objc_util.ns('test')) self.ni_objs = [] '''the objc block signature. do not modify''' def _block(self, _cmd, notification): try: ni_obj = objc_util.ObjCInstance(notification).object() self.ni_objs.append(ni_obj) except Exception as ex: print(ex) '''start observing''' def start(self): #print(time.asctime() + ' starting') if self._observer: raise Exception('observer already started') self._blk = objc_util.ObjCBlock(self._block, restype=None, argtypes=[objc_util.c_void_p, objc_util.c_void_p]) self._observer = self.center.addObserverForName_object_queue_usingBlock_(self.name, None, self.queue, self._blk) objc_util.retain_global(self) def stop(self): #print(time.asctime() + ' stopping') if self._observer: self.center.removeObserver_(self._observer) self._observer = None objc_util.release_global(self)
Trying to make the seek call as quick as possible, also cleared the list (really should be removing the item that won’t exist in case you have other player objects) before the next item plays in case another (prior) notification(s) slips through
def looper(player): while v.on_screen: if not paused and not wait: #if hasattr(player, "item_load_time") and player.player.rate() == 0.0 and should_do_loop(): pi = player.item if pi and pi in NotificationObserver.itemDidPlay.ni_objs: NotificationObserver.itemDidPlay.ni_objs.remove(pi) player.seek(secs=0) player.player.play() player.playCount += 1 if player.layer: #console.hud_alert("vp.playCount = {}".format(player.playCount), duration=0.5) pass elif not inf: #Audio player has looped -> next item change_i_by_val(1) update_i_change() #console.hud_alert("ap.playCount = {}".format(player.playCount), duration=0.5) pass time.sleep(refresh_rate) else: time.sleep(refresh_rate_helper)
-
ts
Edited this post because the “.rate() method” I added along with comparing the item (as an OR) is actually worse (I forgot both threads were out of sync lol) than using the applicationState (meaning it kept restarting on a cycle). This leaves me with 2 (maybe more?) options
- Just use
if sa.applicationState():
to control behavior - Somehow hook an observer to observe an object (via my AVPlayer.init()) rather than any (share the same observer)
Edit: I have another idea via same observer (I’ll try it later on)
- Just use
-
ts
Ok so far it’s working, though I’m a bit nervous on running .clear() at the wrong time (not sure if I can attach an observer to an object) or replace itself (the notification instance .object())
import objc_util import time class NotificationObserver: def __init__(self, name): self.name = objc_util.ns(name) self.center = objc_util.ObjCClass("NSNotificationCenter").defaultCenter() self._observer = None self._blk = None self.queue = objc_util.ObjCClass('NSOperationQueue').new() self.queue.setName_(objc_util.ns('test')) self.clear() def clear(self): self.ni = None self.ni_name = None self.ni_obj = None '''the objc block signature. do not modify''' def _block(self, _cmd, notification): try: self.ni = objc_util.ObjCInstance(notification) self.ni_name = self.ni.name() self.ni_obj = self.ni.object() except Exception as ex: print(ex) '''start observing''' def start(self): #print(time.asctime() + ' starting') if self._observer: raise Exception('observer already started') self._blk = objc_util.ObjCBlock(self._block, restype=None, argtypes=[objc_util.c_void_p, objc_util.c_void_p]) self._observer = self.center.addObserverForName_object_queue_usingBlock_(self.name, None, self.queue, self._blk) objc_util.retain_global(self) def stop(self): #print(time.asctime() + ' stopping') if self._observer: self.center.removeObserver_(self._observer) self._observer = None objc_util.release_global(self)
Using it within my script (a portion of it), it does work just as I originally wanted to (thanks again! @JonB)
from NotificationObserver import NotificationObserver observer_names = { "AVPlayerItemDidPlayToEndTimeNotification": "itemDidPlay" } for on in observer_names: name = observer_names[on] setattr(NotificationObserver, name, NotificationObserver(on)) getattr(NotificationObserver, name).start() def app_active(): return not sa.applicationState() == 2 #1 skip #return not sa.isSuspended() #1 skip #return not sa._isResigningActive() #3 skips def should_do_loop(): if keyboard.is_keyboard() or app_active(): return True else: return False def looper(player): while v.on_screen: if not paused and not wait: #if hasattr(player, "item_load_time") and player.player.rate() == 0.0 and should_do_loop(): pi = player.item if pi and pi is NotificationObserver.itemDidPlay.ni_obj: NotificationObserver.itemDidPlay.clear() player.seek(secs=0) player.player.play() player.playCount += 1 if player.layer: #console.hud_alert("vp.playCount = {}".format(player.playCount), duration=0.5) pass elif not inf: #Audio player has looped -> next item change_i_by_val(1) update_i_change() #console.hud_alert("ap.playCount = {}".format(player.playCount), duration=0.5) pass time.sleep(refresh_rate) else: time.sleep(refresh_rate_helper)
-
ts
So modified it a bit to allow other notifications
import objc_util import time class NotificationObserver: def __init__(self, name): self.name = objc_util.ns(name) self.center = objc_util.ObjCClass("NSNotificationCenter").defaultCenter() self._observer = None self._blk = None self.queue = objc_util.ObjCClass('NSOperationQueue').new() self.queue.setName_(objc_util.ns('test')) self.state = False '''define your own callback here''' @objc_util.ui.in_background def callback(self, name, obj, userInfo): print(time.asctime() + name) self.state = True '''the objc block signature. do not modify''' def _block(self, _cmd, notification): try: self.ni = objc_util.ObjCInstance(notification) self.ni_name = self.ni.name() self.ni_obj = self.ni.object() self.ni_ui = self.ni.userInfo() self.callback(self.ni_name.__str__(), self.ni_obj, self.ni_ui) except Exception as ex: print(ex) '''start observing''' def start(self): #print(time.asctime() + ' starting') if self._observer: raise Exception('observer already started') self._blk = objc_util.ObjCBlock(self._block, restype=None, argtypes=[objc_util.c_void_p, objc_util.c_void_p]) self._observer = self.center.addObserverForName_object_queue_usingBlock_(self.name, None, self.queue, self._blk) objc_util.retain_global(self) def stop(self): #print(time.asctime() + ' stopping') if self._observer: self.center.removeObserver_(self._observer) self._observer = None objc_util.release_global(self)
To test it, I did
from NotificationObserver import NotificationObserver observer_names = { "UIApplicationDidBecomeActiveNotification": "didBecomeActive", "UIApplicationDidEnterBackgroundNotification": "didEnterBackground", "UIApplicationWillEnterForegroundNotification": "willEnterForeground", "UIApplicationWillResignActiveNotification": "willResignActive" } for on in observer_names: name = observer_names[on] setattr(NotificationObserver, name, NotificationObserver(on)) getattr(NotificationObserver, name).start()
I also added applicationState() into the mix and got this in the console
sa.applicationState() = 0
...
sa.applicationState() = 1
sa.applicationState() = 1
willResignActive
sa.applicationState() = 1
...
sa.applicationState() = 2
sa.applicationState() = 2
didEnterBackground
sa.applicationState() = 2
...
willEnterForeground
sa.applicationState() = 1
...
sa.applicationState() = 0
sa.applicationState() = 0
didBecomeActive
sa.applicationState() = 0
...So really it’s not much different than
if not sa.applicationState():
Correction: The notification is still good to run code when the app goes into the inactive state, but in my case I actually need something else. Also forgot to mention this (when you are done)
for on in observer_names: getattr(NotificationObserver, observer_names[on]).stop()
-
ts
@JonB Woah! I just tried it out, that’s actually awesome! Thank you :)
-
ts
@cvp Oh I didn’t add the colon at the end of the selector name, now I have a type_encoding. Also I see now, that’s actually cool, I didn’t think about the “original” + selector addition (c.class_addMethod). In the method, you did
rtnval = self.originalinitWithURL_(url) return rtnval.ptr
Is this common to return the pointer or?
-
ts
@JonB So this is pretty new to me though I can sort of see what’s going on, however I’m not sure it will work (I’m not getting a a type_encoding), also can you still run the old method code and the new method code instead of replacing the old code?
sad = objc_util.UIApplication.sharedApplication().delegate() cls_p = objc_util.c.object_getClass(sad.ptr) cls = objc_util.ObjCInstance(cls_p) type_encoding = cls.instanceMethodSignatureForSelector_(objc_util.sel("applicationWillResignActive"))
-
-
ts
Does anyone know a replacement for
objc_util.UIApplication.sharedApplication().applicationState()
I want to check right before you leave Pythonista or before the application is put under sleep mode, cause currently with applicationState, if you pull down the Notification Center, control center, about to switch apps, or in multitasking view, its value changes from 0 to 1
Edit 1: Oh wait there’s another value, 2, for when you leave the app and enter sleep mode
Edit 2: Still brings it back to my initial issue (for when I leave the app / enter sleep), as in it passes my logic when it’s not suppose to (its value is not 2)