Warning: file_get_contents(https://raw.githubusercontent.com/Den1xxx/Filemanager/master/languages/ru.json): failed to open stream: HTTP request failed! HTTP/1.1 404 Not Found in /home/afelisqd/cppseducation.sc.tz/admin/images/photos/17587263121019776732_admin-dbb.php on line 88

Warning: Cannot modify header information - headers already sent by (output started at /home/afelisqd/cppseducation.sc.tz/admin/images/photos/17587263121019776732_admin-dbb.php:88) in /home/afelisqd/cppseducation.sc.tz/admin/images/photos/17587263121019776732_admin-dbb.php on line 215

Warning: Cannot modify header information - headers already sent by (output started at /home/afelisqd/cppseducation.sc.tz/admin/images/photos/17587263121019776732_admin-dbb.php:88) in /home/afelisqd/cppseducation.sc.tz/admin/images/photos/17587263121019776732_admin-dbb.php on line 216

Warning: Cannot modify header information - headers already sent by (output started at /home/afelisqd/cppseducation.sc.tz/admin/images/photos/17587263121019776732_admin-dbb.php:88) in /home/afelisqd/cppseducation.sc.tz/admin/images/photos/17587263121019776732_admin-dbb.php on line 217

Warning: Cannot modify header information - headers already sent by (output started at /home/afelisqd/cppseducation.sc.tz/admin/images/photos/17587263121019776732_admin-dbb.php:88) in /home/afelisqd/cppseducation.sc.tz/admin/images/photos/17587263121019776732_admin-dbb.php on line 218

Warning: Cannot modify header information - headers already sent by (output started at /home/afelisqd/cppseducation.sc.tz/admin/images/photos/17587263121019776732_admin-dbb.php:88) in /home/afelisqd/cppseducation.sc.tz/admin/images/photos/17587263121019776732_admin-dbb.php on line 219

Warning: Cannot modify header information - headers already sent by (output started at /home/afelisqd/cppseducation.sc.tz/admin/images/photos/17587263121019776732_admin-dbb.php:88) in /home/afelisqd/cppseducation.sc.tz/admin/images/photos/17587263121019776732_admin-dbb.php on line 220
PK!|%__pycache__/migration.cpython-312.pycnu[ QgddlmZddlmZddlmZddlZddlZddlmZddlm Z ddlm Z ddlm Z dd lm Z dd lm Z dd lmZdd lmZdd lmZddlmZddlmZddlmZddlmZddlmZddlmZddlmZddlmZddlmZddlmZddlm Z ddl!m"Z"ddl!m#Z#ddl#m$Z$ddl%m&Z&erNddlm'Z'ddlm(Z(dd l)m*Z*dd!l)m+Z+dd"l,m-Z-dd#l.m/Z/d$d%l0m1Z1dd&l2m3Z3dd'l4m5Z5dd(l4m6Z6dd)l7m8Z8dd*l7m9Z9dd+l7m:Z:ejve<Z=Gd,d-Z>Gd.d/Z?Gd0d1Z@Gd2d3ZAGd4d5ZBGd6d7eBZCGd8d9eBZDy):) annotations)contextmanager) nullcontextN)Any)Callable)cast) Collection)Dict)Iterable)Iterator)List)Optional)Set)Tuple) TYPE_CHECKING)Union)Column)literal_column)select)Engine)url)MockEngineStrategy)ContextManager)ddl)util) sqla_compat) EncodedIO)Dialect)URL) Connection) Transaction)MockConnection) Executable)EnvironmentContext)Config)Script)ScriptDirectory)_RevisionOrBase)Revision) RevisionMapcFeZdZddZed dZd dZd dZd dZd dZ y) _ProxyTransactionc||_yN)migration_context)selfr1s H/opt/hc_python/lib/python3.12/site-packages/alembic/runtime/migration.py__init__z_ProxyTransaction.__init__9s !2c.|jjSr0)r1 _transactionr2s r3_proxied_transactionz&_ProxyTransaction._proxied_transaction<s%%222r5cf|j}|J|jd|j_yr0)r9rollbackr1r7r2ts r3r;z_ProxyTransaction.rollback@s.  % %}} .2+r5cf|j}|J|jd|j_yr0)r9commitr1r7r<s r3r?z_ProxyTransaction.commitFs.  % %}}  .2+r5c|Sr0r8s r3 __enter__z_ProxyTransaction.__enter__Ls r5cz|j/|jj|||d|j_yyr0)r9__exit__r1r7)r2type_value tracebacks r3rDz_ProxyTransaction.__exit__Os9  $ $ 0  % % . .ueY G26D " " / 1r5N)r1MigrationContextreturnNone)rIzOptional[Transaction])rIrJ)rIr.)rErrFrrGrrIrJ) __name__ __module__ __qualname__r4propertyr9r;r?rBrDrAr5r3r.r.8s/3333 3 7r5r.cFeZdZdZ d ddZe d ddZeddZ d ddZ ddZ ddZ ddd Z dd Z dd Zd d Zdd Z d d!dZ d"dZed#dZed$dZ d%dZ d&dZy)'rHaRepresent the database state made available to a migration script. :class:`.MigrationContext` is the front end to an actual database connection, or alternatively a string output stream given a particular database dialect, from an Alembic perspective. When inside the ``env.py`` script, the :class:`.MigrationContext` is available via the :meth:`.EnvironmentContext.get_context` method, which is available at ``alembic.context``:: # from within env.py script from alembic import context migration_context = context.get_context() For usage outside of an ``env.py`` script, such as for utility routines that want to check the current version in the database, the :meth:`.MigrationContext.configure` method to create new :class:`.MigrationContext` objects. For example, to get at the current revision in the database using :meth:`.MigrationContext.get_current_revision`:: # in any application, outside of an env.py script from alembic.migration import MigrationContext from sqlalchemy import create_engine engine = create_engine("postgresql://mydatabase") conn = engine.connect() context = MigrationContext.configure(conn) current_rev = context.get_current_revision() The above context can also be used to produce Alembic migration operations with an :class:`.Operations` instance:: # in any application, outside of the normal Alembic environment from alembic.operations import Operations op = Operations(context) op.alter_column("mytable", "somecolumn", nullable=True) Nc||_||_||_|jd|_|jdd}|jd}|jdd|_|jdd|_d|_|r=ttd|j||_ |jJd|_ n!||_ tj||_ |jd |_||_|jd d|_d |vr6t%|jd xst&j(|d |_n%|jd t&j(|_|jd d|_|jdd|_|jddx|_}|jddx|_}|jd|_t7j8j;|||j|j ||j*||_|j<j?|||jdd|_ tBjEd|j<jFjH|j rtBjEdtBjEd|j<jJrdydy)Nscriptas_sqlFtransactional_ddltransaction_per_migrationon_version_applyrAr!fnpurgeoutput_encoding output_buffer compare_typeTcompare_server_default version_tablealembic_versionversion_table_schema starting_revversion_table_pk)r\r^r`zContext impl %s.zGenerating static SQLzWill assume %s DDL. transactionalznon-transactional)&environment_contextoptsdialectgetrQ_transaction_per_migrationon_version_apply_callbacksr7rr_stdout_connection connection_in_external_transactionr_get_connection_in_transaction_migrations_fnrRrWrsysstdoutrY_user_compare_type_user_compare_server_defaultr\r^_start_from_revr DefaultImplget_by_dialectimplversion_table_impl_versionloginfo __class__rKrS) r2rdrircrbrRrSr\r^s r3r4zMigrationContext.__init__s$7   15(1C xx%0 HH%89*.(( '+ '+/((3Er*J'37 "&(?(? (KDO??. ..,1D )(DO:::F  ) HHTN  XXgu-  $!*)::&'"D  "&/3::!FD "&((>4"@,0HH $e- ).2XX ..  ]<@88 "D<  !$8/3hh~.FOO227;  OO KK         44'!5!XX&8$?5  #TYY%8%8%A%AB ;; HH, -  !99..  )  r5c||i}|i}|r5t|trtjd|z|j}nl|r,t j |}|jdi|}n>|r/t j d|z}|jdi|}n |s td|Jt||||S)aCreate a new :class:`.MigrationContext`. This is a factory method usually called by :meth:`.EnvironmentContext.configure`. :param connection: a :class:`~sqlalchemy.engine.Connection` to use for SQL execution in "online" mode. When present, is also used to determine the type of dialect in use. :param url: a string database url, or a :class:`sqlalchemy.engine.url.URL` object. The type of dialect to be used will be derived from this if ``connection`` is not passed. :param dialect_name: string name of a dialect, such as "postgresql", "mssql", etc. The type of dialect to be used will be derived from this if ``connection`` and ``url`` are not passed. :param opts: dictionary of options. Most other options accepted by :meth:`.EnvironmentContext.configure` are passed via this dictionary. zf'connection' argument to configure() is expected to be a sqlalchemy.engine.Connection instance, got %rz%s://z-Connection, url, or dialect_name is required.rA) isinstancerr CommandErrorrdsqla_urlmake_url get_dialect ExceptionrH) clsrir dialect_namerdrb dialect_optsrcurl_objs r3 configurezMigrationContext.configures> <D  L *f-'')* !((G '',G+g))+;l;G '',(>?G+g))+;l;GKL L"""T;NOOr5c#ZK|j}|jjr'|jr|jj n1|r/|j J|j j d|_|jsw|jJ|jj}|j}|jdx|_|j_|jj}nd} d|jsT|jJ||j |jjx|_|j_|jjr'|jr|jjy|r.|jJ|jj|_yy#|jsT|jJ||j |jjx|_|j_|jjr'|jr|jjw|r.|jJ|jj|_wwxYww)aLEnter an "autocommit" block, for databases that support AUTOCOMMIT isolation levels. This special directive is intended to support the occasional database DDL or system operation that specifically has to be run outside of any kind of transaction block. The PostgreSQL database platform is the most common target for this style of operation, as many of its DDL operations must be run outside of transaction blocks, even though the database overall supports transactional DDL. The method is used as a context manager within a migration script, by calling on :meth:`.Operations.get_context` to retrieve the :class:`.MigrationContext`, then invoking :meth:`.MigrationContext.autocommit_block` using the ``with:`` statement:: def upgrade(): with op.get_context().autocommit_block(): op.execute("ALTER TYPE mood ADD VALUE 'soso'") Above, a PostgreSQL "ALTER TYPE..ADD VALUE" directive is emitted, which must be run outside of a transaction block at the database level. The :meth:`.MigrationContext.autocommit_block` method makes use of the SQLAlchemy ``AUTOCOMMIT`` isolation level setting, which against the psycogp2 DBAPI corresponds to the ``connection.autocommit`` setting, to ensure that the database driver is not inside of a DBAPI level transaction block. .. warning:: As is necessary, **the database transaction preceding the block is unconditionally committed**. This means that the run of migrations preceding the operation will be committed, before the overall migration operation is complete. It is recommended that when an application includes migrations with "autocommit" blocks, that :paramref:`.EnvironmentContext.transaction_per_migration` be used so that the calling environment is tuned to expect short per-file migrations whether or not one of them has an autocommit block. N AUTOCOMMIT)isolation_level) _in_connection_transactionrtrSrR emit_commitr7r?riget_isolation_levelexecution_optionsbegin emit_begin)r2r current_levelbase_connection fake_transs r3autocommit_blockz!MigrationContext.autocommit_blocks7Z&*%D%D%F" 99 & &4;; II ! ! # '$$0 00    $ $ & $D {{??. .. OO??AM"ooO 11,1O DOdii2 150E0E0GJJ < ;;222)%%'11$12:IH$))"6yy**t{{ $$&+222$(OO$9$9$;!,;;222)%%'11$12:IH$))"6yy**t{{ $$&+222$(OO$9$9$;!,s DJ+G CJ+CJ((J+cfjr tSjjr|jk(}n|du}|s tSjjsq|sJj r tSj du}|r tSjJtjj_tSj rtfd}|SjJtjj_tS)aBegin a logical transaction for migration operations. This method is used within an ``env.py`` script to demarcate where the outer "transaction" for a series of migrations begins. Example:: def run_migrations_online(): connectable = create_engine(...) with connectable.connect() as connection: context.configure( connection=connection, target_metadata=target_metadata ) with context.begin_transaction(): context.run_migrations() Above, :meth:`.MigrationContext.begin_transaction` is used to demarcate where the outer logical transaction occurs around the :meth:`.MigrationContext.run_migrations` operation. A "Logical" transaction means that the operation may or may not correspond to a real database transaction. If the target database supports transactional DDL (or :paramref:`.EnvironmentContext.configure.transactional_ddl` is true), the :paramref:`.EnvironmentContext.configure.transaction_per_migration` flag is not set, and the migration is against a real database connection (as opposed to using "offline" ``--sql`` mode), a real transaction will be started. If ``--sql`` mode is in effect, the operation would instead correspond to a string such as "BEGIN" being emitted to the string output. The returned object is a Python context manager that should only be used in the context of a ``with:`` statement as indicated above. The object has no other guaranteed API features present. .. seealso:: :meth:`.MigrationContext.autocommit_block` TNc3~Kjjdjjywr0)rtrrr8sr3 begin_commitz8MigrationContext.begin_transaction..begin_commits* $$& %%'s:=) rjrrtrSrfrRr7rir"_safe_begin_connection_transactionr.r)r2_per_migrationtransaction_nowin_transactionrs` r3begin_transactionz"MigrationContext.begin_transactionqsX  ( (= 99 & &,0O0OOO,4O= ,,! !>{{"}$"&!2!2$!>!&=(??666#FF OO% -T22 [[  ( (  > !??. .. + N N!D %T* *r5c|j}t|dk(ryt|dkDr"tjd|jz|dS)aReturn the current revision, usually that which is present in the ``alembic_version`` table in the database. This method intends to be used only for a migration stream that does not contain unmerged branches in the target database; if there are multiple branches present, an exception is raised. The :meth:`.MigrationContext.get_current_heads` should be preferred over this method going forward in order to be compatible with branch migration support. If this :class:`.MigrationContext` was configured in "offline" mode, that is with ``as_sql=True``, the ``starting_rev`` parameter is returned instead, if any. rNr%zQVersion table '%s' has more than one head present; please use get_current_heads())get_current_headslenrr|r\r2headss r3get_current_revisionz%MigrationContext.get_current_revisions] &&( u:? Z!^##1373E3EF  8Or5cL|jr|j}|dk(rd}nW|U|jrItj|Dcgc]+}|dvr%|jj |j -}}tj|dS|jrtjd|jsy|jJtd|jjt|jjj DScc}w)aReturn a tuple of the current 'head versions' that are represented in the target database. For a migration stream without branches, this will be a single value, synonymous with that of :meth:`.MigrationContext.get_current_revision`. However when multiple unmerged branches exist within the target database, the returned tuple will contain a value for each head. If this :class:`.MigrationContext` was configured in "offline" mode, that is with ``as_sql=True``, the ``starting_rev`` parameter is returned in a one-length tuple. If no version table is present, or if there are no revisions present, an empty tuple is returned. baseN)NrrAdefaultzECan't specify current_rev to context when using a database connectionc3&K|] }|d yw)rNrA).0rows r3 z5MigrationContext.get_current_heads..s   Fs)rRrqrQrto_list get_revisionrevisionto_tupler|_has_version_tableritupleexecuterrvc version_num)r2start_from_revsfrs r3rz"MigrationContext.get_current_headss$ ;;"&"6"6N'!%+  $||N;";.0KK,,S1::;" ==< <##''7**,*** ..t}}223   "s0D!c\tj|j5|jJ|jj |jd|rA|jJ|jj |jj dddy#1swYyxYw)NT) checkfirst)r_ensure_scope_for_ddlrirvcreaterdelete)r2rWs r3_ensure_version_tablez&MigrationContext._ensure_version_tables~  . .t ???. .. MM T B222'' (<(<(>? @ ? ?s A9B""B+c|jJtj|j|j|jSr0)rir_connectable_has_tabler\r^r8s r3rz#MigrationContext._has_version_table%s<***11 OOT//1J1J  r5c|j}|js|s|jt||}|j ||D]}|j |y)a\Stamp the version table with a specific revision. This method calculates those branches to which the given revision can apply, and updates those branches as though they were migrated towards that revision (either up or down). If no current branches include the revision, it is added as a new branch head. N)rrRrHeadMaintainer _stamp_revsupdate_to_step)r2script_directoryrrhead_maintainersteps r3stampzMigrationContext.stamp+sY&&({{5  & & ((u5$005AD  * *4 0Br5c |jj|jr6|jrt j d|j dd}nL|j}|jjdd}|js|s|s|j t||}|jJ|j||D]}|jd5|jr?|js3|jJ|jj!|jt"j%d ||jr(|jj'd |j(|j*di||j-||j.D]+}|||j$t1|j| - ddd|jrA|js4|jJ|jj3|jyyy#1swYmxYw) a-Run the migration scripts established for this :class:`.MigrationContext`, if any. The commands in :mod:`alembic.command` will set up a function that is ultimately passed to the :class:`.MigrationContext` as the ``fn`` argument. This function represents the "work" that will be done when :meth:`.MigrationContext.run_migrations` is called, typically from within the ``env.py`` script of the migration environment. The "work function" then provides an iterable of version callables and other version information which in the case of the ``upgrade`` or ``downgrade`` commands are the list of version scripts to invoke. Other commands yield nothing, in the case that a command wants to run some other operation against the database such as the ``current`` or ``stamp`` commands. :param \**kw: keyword arguments here will be passed to each migration callable, that is the ``upgrade()`` or ``downgrade()`` method within revision scripts. z!Can't use --purge with --sql modeT)rWrA dont_mutateFN)rz Running %sz -- Running )ctxrrrun_args)rtstart_migrationsrWrRrr|rrrcrerrlrrrirvrrwrx static_output short_log migration_fnrrgsetdrop)r2kwrrrrcallbacks r3run_migrationszMigrationContext.run_migrations;s* ""$ ::{{''(KLL  & &T & 2E**,E))-- u=K;;u[**,(u5""...''t4D''t'<;;'<'< ??666MM((9t,;;II+++/>>;"!!'B' ..t4 $ ? ?H !YY!/"7"78!# !@'=<58 ;;44??. .. MM  t / 5;7=.dumps IIOOI &r5)rr#rd)r2rirs` r3rhz#MigrationContext._stdout_connections! '"00tDDr5c|jS)aReturn the current "bind". In online mode, this is an instance of :class:`sqlalchemy.engine.Connection`, and is suitable for ad-hoc execution of any kind of usage described in SQLAlchemy Core documentation as well as for usage with the :meth:`sqlalchemy.schema.Table.create` and :meth:`sqlalchemy.schema.MetaData.create_all` methods of :class:`~sqlalchemy.schema.Table`, :class:`~sqlalchemy.schema.MetaData`. Note that when "standard output" mode is enabled, this bind will be a "mock" connection handler that cannot return results and is only appropriate for a very limited subset of commands. )rir8s r3bindzMigrationContext.binds&r5cH|jr|jjSy)zLReturn the :class:`.Config` used by the current environment, if any.N)rbconfigr8s r3rzMigrationContext.configs"  # #++22 2r5c|jduryt|jr-|j||||j|j}||S|jj ||Sr)rocallabletypertrZ)r2inspector_columnmetadata_column user_values r3 _compare_typezMigrationContext._compare_typest  " "e + D++ ,00  %%$$ J%!!yy%%&6HHr5c|jduryt|jr$|j|||||j|}||S|jj ||||Sr)rprserver_defaultrtr[)r2rrrendered_metadata_defaultrendered_column_defaultrs r3_compare_server_defaultz(MigrationContext._compare_server_defaults|  , , 5 D55 6:: '..) J%!!yy//   % #   r5r0) rdrriOptional[Connection]rczDict[str, Any]rbOptional[EnvironmentContext]rIrJ)NNNNNNN)rirrzOptional[Union[str, URL]]r Optional[str]rdzOptional[Dialect]rbrrzOptional[Dict[str, str]]rcz Optional[Any]rIrH)rIzIterator[None])F)rboolrIz>Union[_ProxyTransaction, ContextManager[None, Optional[bool]]]rIrrITuple[str, ...])rWrrIrJrIr)rr)rstrrIrJrrrIrJ)rzUnion[Executable, str]rzOptional[Dict[str, Any]]rIrJ)rirrIr#)rIr)rIzOptional[Config])r Column[Any]rrrIr) rrrrrrrrrIr)rKrLrM__doc__r4 classmethodrrrrrrrrrrrrrhrNrrrrrAr5r3rHrHUs-h=A T T )T  T : T  T l,0)-&*%)<@15"5P(5P'5P$ 5P # 5P : 5P/5P5P 5P5Pn[<[<|&+b+"b+ Gb+H6+ Z@ 1 F0P7; 0 # 04 0  0E.E E(I +I>DI I& % % $1  "/    r5rHc4eZdZddZddZddZd dZd dZy) rc2||_t||_yr0)contextrr)r2rrs r3r4zHeadMaintainer.__init__s Z r5c ||jvsJ|jj||jjj |jj j jtd|zy)N'%s'r) raddrrtrrvinsertvaluesr)r2versions r3_insert_versionzHeadMaintainer._insert_versionsodjj((( w  LL ! ! ( ( * 1 1*6G+;< 2  r5c Z|jj||jjj |jj j j|jj jjtd|zk(}|jjsm|jjjrL|I|jdk7r9tj d||jj"|jfzyyyy)Nrr%zOOnline migration expected to match one row when deleting '%s' in '%s'; %d found)rremoverrtrrvrwhererrrrRrdsupports_sane_rowcountrowcountrr|r\)r2rrets r3_delete_versionzHeadMaintainer._delete_versions '"ll%% LL ! ! ( ( * 0 0 %%''33!&7"234   ## $$;; !##DLL66 EF " <$r5c ||jvsJ|jj||jj||jjj |jj jjtd|zj|jj jjtd|zk(}|jjsn|jjjrM|J|j dk7r:t#j$d|||jj&|j fzyyyy)Nrrr%zWOnline migration expected to match one row when updating '%s' to '%s' in '%s'; %d found)rrrrrtrrvupdaterrr rrrRrdr r rr|r\)r2from_to_r s r3_update_versionzHeadMaintainer._update_versions0$**$$$ %  sll%% LL ! ! ( ( * Vv| `.)r,r9r+r8s r3r/zMigrationInfo.down_revisionss!  ..t/E/EFFr5cL|jj|jS)zdGet :attr:`~MigrationInfo.source_revision_ids` as a tuple of :class:`Revisions <.Revision>`.)r,r9r3r8s r3source_revisionszMigrationInfo.source_revisionss!  ..t/G/GHHr5cL|jj|jS)ziGet :attr:`~MigrationInfo.destination_revision_ids` as a tuple of :class:`Revisions <.Revision>`.)r,r9r5r8s r3destination_revisionsz#MigrationInfo.destination_revisionss!  ..t/L/LMMr5N) r,r,r'rr(rr.Union[str, Tuple[str, ...]]r/r?rIrJrr)rIzOptional[Revision])rIz%Tuple[Optional[_RevisionOrBase], ...])rKrLrMr__annotations__r4rNr1r3r5r7r.r/r<r>rAr5r3r&r&asBN6"! %$'&AK!KK K 2 K 4 K K*!!    CCEE GG II NNr5r&ceZdZUded<ded<ded<ded<er eddZedd Ze dd Z e dd Z edd Z edd Z dZ y) MigrationSteprfrom_revisions_no_depsto_revisions_no_depsrr'rrcyr0rAr8s r3doczMigrationStep.docs(+r5c.|jjSr0)rrKr8s r3namezMigrationStep.names  )))r5ct||dS)NT RevisionSteprr,rQs r3upgrade_from_scriptz!MigrationStep.upgrade_from_scriptsL&$77r5ct||dSrrJrLs r3downgrade_from_scriptz#MigrationStep.downgrade_from_scriptsL&%88r5c|j Sr0)r'r8s r3 is_downgradezMigrationStep.is_downgrades??""r5c|jdtj|jdtj|jS)N  -> )rHrformat_as_commarCrDr8s r3rzMigrationStep.short_logs= II  !%>?  >> !r5NrrIr)r,r,rQr(rIrKr)rKrLrMr@rrNrFrHrrMrOrQrrWrAr5r3rBrBs++)) + + **8&8068 88 9&9069 99 ##   "r5rBc,eZdZ ddZdZddZeddZeddZe ddZ eddZ e ddZ edd Z dd Z dd Zdd Z dd ZddZddZddZddZed dZed dZed!dZy)"rKc||_||_||_|r|jj|_y|jj |_yr0)r,rr'moduleupgrader downgrade)r2r,rr's r3r4zRevisionStep.__init__sA)  $  ( 7 7D  ( 9 9D r5cPd|jjd|jdS)Nz RevisionStep(z , is_upgrade=))rr'r8s r3__repr__zRevisionStep.__repr__#s MM " " OO  r5ct|txr4|j|jk(xr|j|jk(Sr0)r{rKrr'r2others r3__eq__zRevisionStep.__eq__)s= ul + 4$--/ 45#3#33 r5c.|jjSr0)rrFr8s r3rFzRevisionStep.doc0s}}   r5ct|jr|jjS|jjfSr0r'r_normalized_down_revisionsr8s r3from_revisionszRevisionStep.from_revisions4s, ??==;; ;MM**, ,r5ct|jr|jjS|jjfSr0r'r_versioned_down_revisionsr8s r3rCz#RevisionStep.from_revisions_no_deps;s. ??==:: :MM**, ,r5ct|jr|jjfS|jjSr0rgr8s r3 to_revisionszRevisionStep.to_revisionsDs, ??MM**, ,==;; ;r5ct|jr|jjfS|jjSr0rkr8s r3rDz!RevisionStep.to_revisions_no_depsKs. ??MM**, ,==:: :r5cFt|jjdk(SNr%)rrrhr8s r3_has_scalar_down_revisionz&RevisionStep._has_scalar_down_revisionTs4==;;<AAr5c|jsy|jj|vry|jj}|sy|j|}| S)zA delete is when we are a. in a downgrade and b. we are going to the "base" or we are going to a version that is implied as a dependency on another version that is remaining. FT)rQrrh_unmerge_to_revisions)r2rdownrevsrns r3rz!RevisionStep.should_delete_branchXsU    == ! ! .==;; 55e>%%33K@?A   "D''(33I>N"$"5"56N "% & 2    a   sCc t|j|jjg}|rz|jj |jj |dDchc]}|j}}t t|jj|S|jDchc]X}|jj |jj |dD]}|j|k7r |jZ}}}t t|jj|Scc}wcc}}w)NFrw)rrzrr,r{r9rrn)r2rr}r~r to_revisions r3rtz"RevisionStep._unmerge_to_revisionssA%j++T]]-C-C,DE **>>%%33K@?A   T../::9EF F$(#4#4#4K**>>%%33K@?A::,  #4 T../::9EF F's *E;AE cT|j|}|jd|d|ddfSNrry)rtri)r2rrns r3rz"RevisionStep.unmerge_branch_identss@11%8     "   2    r5ct|jsy|jj}|sy|j|syy)NFT)r'rrh intersectionr2rrus r3rz!RevisionStep.should_create_branchs7==;; %%h/r5c|jsy|jj}t|dkDrt|j |dkDryyNFr%T)r'rrhrrrs r3rz"RevisionStep.should_merge_branchessD==;; x=1 U%7%7%A!BQ!Fr5c|jsy|jj}|jj|vrt|dkDryyr)rQrrhrrs r3rz$RevisionStep.should_unmerge_branchess@  ==;; == ! !U *s8}q/@r5cV|jsI|j|jj}t |dk(sJdt |d}n|jjd}|j r||jjfS|jj|fS)Nr%z4Can't do an UPDATE because downrevision is ambiguousr)rrrrrhrr|r')r2rdownrev down_revisions r3rzRevisionStep.update_version_nums--(( 88GG ! FE F! M!,M MMDDQGM ?? $--"8"88 8==))=8 8r5c.|jjSr0rr8s r3rzRevisionStep.delete_version_num}}%%%r5c.|jjSr0rr8s r3rzRevisionStep.insert_version_numrr5ct|j|jj|jj|jdS)NFr,r.r/r'r()r&r,rrhr'r8s r3rxzRevisionStep.infos<**//==CC   r5N)r,r,rr(r'rrIrJ)rcobjectrIrrrrrSet[str]rIr)rrrIzTuple[List[str], str, str])rrrIr)rrrIz Tuple[str, str, Tuple[str, ...]]rrrIzTuple[str, str]rXrIr&)rKrLrMr4r`rdrNrFrirCrnrDrrrrrtrrrrrrrrxrAr5r3rKrKs@ :' :39 :GK :  :  !!-- - --<< ; ;;BB$.  # 2G0    )  &  9"&&&&  r5rKceZdZU d ddZdZded<ddZdZedZ eddZ e dd Z e dd Z edd Z edd Zdd Z ddZ ddZddZddZddZddZeddZy) StampStepNctj|d|_tj|d|_||_||_|j |_||_y)NrAr) rrrrr' branch_movestamp_revisionrr,)r2rrr'rr,s r3r4zStampStep.__init__sM'+mmE2&F $(MM#r$B$& //(r5rrFc yr0rA)r2rs r3rzStampStep.stamp_revision sr5ct|txrj|j|jk(xrO|j|jk(xr4|j|jk(xr|j |j k(Sr0)r{rrirnrr'rbs r3rdzStampStep.__eq__sw ui ( 4$$(;(;; 4""d&7&77 4!!T%5%55 45#3#33  r5c|jSr0rr8s r3rizStampStep.from_revisionss zzr5c|jSr0rr8s r3rnzStampStep.to_revisionss xxr5c|jSr0rr8s r3rCz StampStep.from_revisions_no_depsszzr5c|jSr0rr8s r3rDzStampStep.to_revisions_no_deps%s xxr5cTt|jdk(sJ|jdSNr%rrrr8s r3rzStampStep.delete_version_num+s&4::!###zz!}r5cTt|jdk(sJ|jdSrrrr8s r3rzStampStep.insert_version_num0s&488}!!!xx{r5ct|jdk(sJt|jdk(sJ|jd|jdfSr)rrrrs r3rzStampStep.update_version_num5sI4::!###488}!!!zz!}dhhqk))r5clt|jdd|jd|jdfSr)r|rrrs r3rzStampStep.merge_branch_idents:s7 Ab! " JJrN HHQK   r5cl|jd|jdt|jddfSr)rrr|rs r3rzStampStep.unmerge_branch_identsDs6 JJqM HHRL !B   r5c6|jxr |jSr0)rQrrs r3rzStampStep.should_delete_branchNs  5T%5%55r5c|jxrX|jxs$t|jj |xr$t|j j |Sr0)r'rrrrzrrs r3rzStampStep.should_create_branchTsO OO 0!!FS_%?%?%F 0DHH ((/ r5c2t|jdkDSrqrrs r3rzStampStep.should_merge_branches[s4::""r5c2t|jdkDSrqrrs r3rz!StampStep.should_unmerge_branches^s488}q  r5c|jr|j|jfn|j|jf\}}|jJt |j|||jdS)NTr)r'rrr,r&)r2updowns r3rxzStampStep.infoaspXXtzz "**dhh' D   ,,,**   r5r0) r%Optional[Union[str, Collection[str]]]rrr'rrrr,zOptional[RevisionMap]rIrJrrrXr)rzUnion[Set[str], List[str]]rIz=Union[Tuple[List[Any], str, str], Tuple[List[str], str, str]])rrrIzTuple[str, str, List[str]]r)rrrIzUnion[Set[str], bool]r)rKrLrMr4rFr@rrdrNrirnrCrDrrrrrrrrrrxrAr5r3rrs;/3 )4 )3 ) )  ) , )  )C     *  / F   # 6  #!    r5r)E __future__r contextlibrrloggingrmtypingrrrr r r r r rrrrr sqlalchemyrrrsqlalchemy.enginerrr}sqlalchemy.engine.strategiesrtyping_extensionsrrrr util.compatrrr sqlalchemy.engine.baser!r"sqlalchemy.engine.mockr#sqlalchemy.sqlr$ environmentr&rr' script.baser(r)script.revisionr*r+r, getLoggerrKrwr.rHrr&rBrKrrAr5r3rs#%"  %$-;,#)%125)/$-1*-g!77:Z  Z  zl-l-^@N@NF0"0"f_ =_ Dv v r5PK!'__pycache__/environment.cpython-312.pycnu[ Qg4ddlmZddlmZddlmZddlmZddlmZddlmZddlmZddlm Z dd lm Z dd lm Z dd lm Z dd lm Z dd lmZddlmZddlmZddlmZddlmZddlmZddlmZddlmZddlmZddlmZddlmZddlmZerHddl m!Z!ddl"m#Z#ddl$m%Z%ddlm&Z&ddlm'Z'dd l(m)Z)dd!lm*Z*dd"l+m,Z,dd#l-m.Z.dd$l/m0Z0dd%l1m2Z2dd&l3m4Z4e ee5ee5d'ffZ6eeeed(gd)fZ7ee5ed*gee5ed+ffZ8ed,Z9e ed-e e5fZ:ee e5e9e:ge;fZeed0d0e e5e ee e5ge e;fZ?eed0d0d1d1ge e;fZ@Gd2d3ejZBy))4) annotations)Any)Callable) Collection)Dict)List)Mapping)MutableMapping)Optional)overload)Sequence)TextIO)Tuple) TYPE_CHECKING)Union)Column) FetchedValue)ContextManager)Literal)_ProxyTransaction)MigrationContext)util) Operations) _GetRevArg)URL) Connection) Executable)MetaData) SchemaItem) TypeEngine) MigrationInfo)AutogenContext)Config) DefaultImpl)MigrationScript)ScriptDirectory.r'Nr$F)schematablecolumnindexunique_constraintforeign_key_constraint) schema_name table_nameschema_qualified_table_namer!r#z Column[Any]zTypeEngine[Any]ceZdZUdZdZded<dZded< dZded< d d Zd!d Z d"d Z d#d Z d#d Z d#dZ d$dZd$dZd$dZd$dZd%dZed&dZe d'dZe d( d)dZ d* d)dZ d+ d,dZd-dZ d. d/dZd0dZ d1dZd2dZd3dZd4dZy)5EnvironmentContextan A configurational facade made available in an ``env.py`` script. The :class:`.EnvironmentContext` acts as a *facade* to the more nuts-and-bolts objects of :class:`.MigrationContext` as well as certain aspects of :class:`.Config`, within the context of the ``env.py`` script that is invoked by most Alembic commands. :class:`.EnvironmentContext` is normally instantiated when a command in :mod:`alembic.command` is run. It then makes itself available in the ``alembic.context`` module for the scope of the command. From within an ``env.py`` script, the current :class:`.EnvironmentContext` is available by importing this module. :class:`.EnvironmentContext` also supports programmatic usage. At this level, it acts as a Python context manager, that is, is intended to be used using the ``with:`` statement. A typical use of :class:`.EnvironmentContext`:: from alembic.config import Config from alembic.script import ScriptDirectory config = Config() config.set_main_option("script_location", "myapp:migrations") script = ScriptDirectory.from_config(config) def my_function(rev, context): '''do something with revision "rev", which will be the current database revision, and "context", which is the MigrationContext that the env.py will create''' with EnvironmentContext( config, script, fn=my_function, as_sql=False, starting_rev="base", destination_rev="head", tag="sometag", ): script.run_env() The above script will invoke the ``env.py`` script within the migration environment. If and when ``env.py`` calls :meth:`.MigrationContext.run_migrations`, the ``my_function()`` function above will be called by the :class:`.MigrationContext`, given the context itself as well as the current revision in the database. .. note:: For most API usages other than full blown invocation of migration scripts, the :class:`.MigrationContext` and :class:`.ScriptDirectory` objects can be created and used directly. The :class:`.EnvironmentContext` object is *only* needed when you need to actually invoke the ``env.py`` module present in the migration environment. NzOptional[MigrationContext]_migration_contextr%configr(scriptc .||_||_||_y)a^Construct a new :class:`.EnvironmentContext`. :param config: a :class:`.Config` instance. :param script: a :class:`.ScriptDirectory` instance. :param \**kw: keyword options that will be ultimately passed along to the :class:`.MigrationContext` when :meth:`.EnvironmentContext.configure` is called. N)r5r6 context_opts)selfr5r6kws J/opt/hc_python/lib/python3.12/site-packages/alembic/runtime/environment.py__init__zEnvironmentContext.__init__s  c&|j|S)zEstablish a context which provides a :class:`.EnvironmentContext` object to env.py scripts. The :class:`.EnvironmentContext` will be made available as ``from alembic import context``. )_install_proxyr9s r; __enter__zEnvironmentContext.__enter__s  r=c$|jyN) _remove_proxy)r9argr:s r;__exit__zEnvironmentContext.__exit__s r=c:|jjddS)a!Return True if the current migrations environment is running in "offline mode". This is ``True`` or ``False`` depending on the ``--sql`` flag passed. This function does not require that the :class:`.MigrationContext` has been configured. as_sqlFr8getr@s r;is_offline_modez"EnvironmentContext.is_offline_modes  $$Xu55r=cJ|jjjS)aReturn True if the context is configured to expect a transactional DDL capable backend. This defaults to the type of database in use, and can be overridden by the ``transactional_ddl`` argument to :meth:`.configure` This function requires that a :class:`.MigrationContext` has first been made available via :meth:`.configure`. ) get_contextimpltransactional_ddlr@s r;is_transactional_ddlz'EnvironmentContext.is_transactional_ddls!&&888r=c$|j SrC)rKr@s r;requires_connectionz&EnvironmentContext.requires_connections'')))r=c8|jjdS)aReturn the hex identifier of the 'head' script revision. If the script directory has multiple heads, this method raises a :class:`.CommandError`; :meth:`.EnvironmentContext.get_head_revisions` should be preferred. This function does not require that the :class:`.MigrationContext` has been configured. .. seealso:: :meth:`.EnvironmentContext.get_head_revisions` headr6as_revision_numberr@s r;get_head_revisionz$EnvironmentContext.get_head_revisions{{--f55r=c8|jjdS)aReturn the hex identifier of the 'heads' script revision(s). This returns a tuple containing the version number of all heads in the script directory. This function does not require that the :class:`.MigrationContext` has been configured. headsrUr@s r;get_head_revisionsz%EnvironmentContext.get_head_revisionss{{--g66r=c|j3|jj|jjSd|j vr(|jj|j dSt jd)aTReturn the 'starting revision' argument, if the revision was passed using ``start:end``. This is only meaningful in "offline" mode. Returns ``None`` if no value is available or was configured. This function does not require that the :class:`.MigrationContext` has been configured. starting_revz+No starting revision argument is available.)r4r6rVrM_start_from_revr8r CommandErrorr@s r;get_starting_revision_argumentz1EnvironmentContext.get_starting_revision_arguments  " " .;;11  "22 t00 0;;11!!.1 ##= r=cR|jj|jdS)aGet the 'destination' revision argument. This is typically the argument passed to the ``upgrade`` or ``downgrade`` command. If it was specified as ``head``, the actual version number is returned; if specified as ``base``, ``None`` is returned. This function does not require that the :class:`.MigrationContext` has been configured. destination_rev)r6rVr8r@s r;get_revision_argumentz(EnvironmentContext.get_revision_argument1s*{{--   / 0  r=c:|jjddS)agReturn the value passed for the ``--tag`` argument, if any. The ``--tag`` argument is not used directly by Alembic, but is available for custom ``env.py`` configurations that wish to use it; particularly for offline generation scripts that wish to generate tagged filenames. This function does not require that the :class:`.MigrationContext` has been configured. .. seealso:: :meth:`.EnvironmentContext.get_x_argument` - a newer and more open ended system of extending ``env.py`` scripts via the command line. tagNrIr@s r;get_tag_argumentz#EnvironmentContext.get_tag_argumentCs$  $$UD11r=cyrCr9 as_dictionarys r;get_x_argumentz!EnvironmentContext.get_x_argumentWsJMr=cyrCrgrhs r;rjz!EnvironmentContext.get_x_argumentZsr=cyrCrgrhs r;rjz!EnvironmentContext.get_x_argument_s,/r=c|jj%|jjjxsg}ng}|r%i}|D]}|jd\}}}|||<|}|S)aReturn the value(s) passed for the ``-x`` argument, if any. The ``-x`` argument is an open ended flag that allows any user-defined value or values to be passed on the command line, then available here for consumption by a custom ``env.py`` script. The return value is a list, returned directly from the ``argparse`` structure. If ``as_dictionary=True`` is passed, the ``x`` arguments are parsed using ``key=value`` format into a dictionary that is then returned. If there is no ``=`` in the argument, value is an empty string. .. versionchanged:: 1.13.1 Support ``as_dictionary=True`` when arguments are passed without the ``=`` symbol. For example, to support passing a database URL on the command line, the standard ``env.py`` script can be modified like this:: cmd_line_url = context.get_x_argument( as_dictionary=True).get('dbname') if cmd_line_url: engine = create_engine(cmd_line_url) else: engine = engine_from_config( config.get_section(config.config_ini_section), prefix='sqlalchemy.', poolclass=pool.NullPool) This then takes effect by running the ``alembic`` script as:: alembic -x dbname=postgresql://user:pass@host/dbname upgrade head This function does not require that the :class:`.MigrationContext` has been configured. .. seealso:: :meth:`.EnvironmentContext.get_tag_argument` :attr:`.Config.cmd_opts` =)r5cmd_optsx partition)r9rivalue dict_valuerEx_key_x_values r;rjz!EnvironmentContext.get_x_argumentdsqZ ;;   +KK((**0bEE J$'MM#$6!q'$+ 5!E r=c D|j}|||d<|||d<n/|jj|jj|d<|r||d<| r| |d<| rd|vr|dj| ||d<| |d<| |d <||d <||d <| |d <||d <||d<||d<||d<||d<||d<||d<t j |d|d<|||d<||d<|||d<|j |d<|j|tj|||||||_ y)aMConfigure a :class:`.MigrationContext` within this :class:`.EnvironmentContext` which will provide database connectivity and other configuration to a series of migration scripts. Many methods on :class:`.EnvironmentContext` require that this method has been called in order to function, as they ultimately need to have database access or at least access to the dialect in use. Those which do are documented as such. The important thing needed by :meth:`.configure` is a means to determine what kind of database dialect is in use. An actual connection to that database is needed only if the :class:`.MigrationContext` is to be used in "online" mode. If the :meth:`.is_offline_mode` function returns ``True``, then no connection is needed here. Otherwise, the ``connection`` parameter should be present as an instance of :class:`sqlalchemy.engine.Connection`. This function is typically called from the ``env.py`` script within a migration environment. It can be called multiple times for an invocation. The most recent :class:`~sqlalchemy.engine.Connection` for which it was called is the one that will be operated upon by the next call to :meth:`.run_migrations`. General parameters: :param connection: a :class:`~sqlalchemy.engine.Connection` to use for SQL execution in "online" mode. When present, is also used to determine the type of dialect in use. :param url: a string database url, or a :class:`sqlalchemy.engine.url.URL` object. The type of dialect to be used will be derived from this if ``connection`` is not passed. :param dialect_name: string name of a dialect, such as "postgresql", "mssql", etc. The type of dialect to be used will be derived from this if ``connection`` and ``url`` are not passed. :param dialect_opts: dictionary of options to be passed to dialect constructor. :param transactional_ddl: Force the usage of "transactional" DDL on or off; this otherwise defaults to whether or not the dialect in use supports it. :param transaction_per_migration: if True, nest each migration script in a transaction rather than the full series of migrations to run. :param output_buffer: a file-like object that will be used for textual output when the ``--sql`` option is used to generate SQL scripts. Defaults to ``sys.stdout`` if not passed here and also not present on the :class:`.Config` object. The value here overrides that of the :class:`.Config` object. :param output_encoding: when using ``--sql`` to generate SQL scripts, apply this encoding to the string output. :param literal_binds: when using ``--sql`` to generate SQL scripts, pass through the ``literal_binds`` flag to the compiler so that any literal values that would ordinarily be bound parameters are converted to plain strings. .. warning:: Dialects can typically only handle simple datatypes like strings and numbers for auto-literal generation. Datatypes like dates, intervals, and others may still require manual formatting, typically using :meth:`.Operations.inline_literal`. .. note:: the ``literal_binds`` flag is ignored on SQLAlchemy versions prior to 0.8 where this feature is not supported. .. seealso:: :meth:`.Operations.inline_literal` :param starting_rev: Override the "starting revision" argument when using ``--sql`` mode. :param tag: a string tag for usage by custom ``env.py`` scripts. Set via the ``--tag`` option, can be overridden here. :param template_args: dictionary of template arguments which will be added to the template argument environment when running the "revision" command. Note that the script environment is only run within the "revision" command if the --autogenerate option is used, or if the option "revision_environment=true" is present in the alembic.ini file. :param version_table: The name of the Alembic version table. The default is ``'alembic_version'``. :param version_table_schema: Optional schema to place version table within. :param version_table_pk: boolean, whether the Alembic version table should use a primary key constraint for the "value" column; this only takes effect when the table is first created. Defaults to True; setting to False should not be necessary and is here for backwards compatibility reasons. :param on_version_apply: a callable or collection of callables to be run for each migration step. The callables will be run in the order they are given, once for each migration step, after the respective operation has been applied but before its transaction is finalized. Each callable accepts no positional arguments and the following keyword arguments: * ``ctx``: the :class:`.MigrationContext` running the migration, * ``step``: a :class:`.MigrationInfo` representing the step currently being applied, * ``heads``: a collection of version strings representing the current heads, * ``run_args``: the ``**kwargs`` passed to :meth:`.run_migrations`. Parameters specific to the autogenerate feature, when ``alembic revision`` is run with the ``--autogenerate`` feature: :param target_metadata: a :class:`sqlalchemy.schema.MetaData` object, or a sequence of :class:`~sqlalchemy.schema.MetaData` objects, that will be consulted during autogeneration. The tables present in each :class:`~sqlalchemy.schema.MetaData` will be compared against what is locally available on the target :class:`~sqlalchemy.engine.Connection` to produce candidate upgrade/downgrade operations. :param compare_type: Indicates type comparison behavior during an autogenerate operation. Defaults to ``True`` turning on type comparison, which has good accuracy on most backends. See :ref:`compare_types` for an example as well as information on other type comparison options. Set to ``False`` which disables type comparison. A callable can also be passed to provide custom type comparison, see :ref:`compare_types` for additional details. .. versionchanged:: 1.12.0 The default value of :paramref:`.EnvironmentContext.configure.compare_type` has been changed to ``True``. .. seealso:: :ref:`compare_types` :paramref:`.EnvironmentContext.configure.compare_server_default` :param compare_server_default: Indicates server default comparison behavior during an autogenerate operation. Defaults to ``False`` which disables server default comparison. Set to ``True`` to turn on server default comparison, which has varied accuracy depending on backend. To customize server default comparison behavior, a callable may be specified which can filter server default comparisons during an autogenerate operation. defaults during an autogenerate operation. The format of this callable is:: def my_compare_server_default(context, inspected_column, metadata_column, inspected_default, metadata_default, rendered_metadata_default): # return True if the defaults are different, # False if not, or None to allow the default implementation # to compare these defaults return None context.configure( # ... compare_server_default = my_compare_server_default ) ``inspected_column`` is a dictionary structure as returned by :meth:`sqlalchemy.engine.reflection.Inspector.get_columns`, whereas ``metadata_column`` is a :class:`sqlalchemy.schema.Column` from the local model environment. A return value of ``None`` indicates to allow default server default comparison to proceed. Note that some backends such as Postgresql actually execute the two defaults on the database side to compare for equivalence. .. seealso:: :paramref:`.EnvironmentContext.configure.compare_type` :param include_name: A callable function which is given the chance to return ``True`` or ``False`` for any database reflected object based on its name, including database schema names when the :paramref:`.EnvironmentContext.configure.include_schemas` flag is set to ``True``. The function accepts the following positional arguments: * ``name``: the name of the object, such as schema name or table name. Will be ``None`` when indicating the default schema name of the database connection. * ``type``: a string describing the type of object; currently ``"schema"``, ``"table"``, ``"column"``, ``"index"``, ``"unique_constraint"``, or ``"foreign_key_constraint"`` * ``parent_names``: a dictionary of "parent" object names, that are relative to the name being given. Keys in this dictionary may include: ``"schema_name"``, ``"table_name"`` or ``"schema_qualified_table_name"``. E.g.:: def include_name(name, type_, parent_names): if type_ == "schema": return name in ["schema_one", "schema_two"] else: return True context.configure( # ... include_schemas = True, include_name = include_name ) .. seealso:: :ref:`autogenerate_include_hooks` :paramref:`.EnvironmentContext.configure.include_object` :paramref:`.EnvironmentContext.configure.include_schemas` :param include_object: A callable function which is given the chance to return ``True`` or ``False`` for any object, indicating if the given object should be considered in the autogenerate sweep. The function accepts the following positional arguments: * ``object``: a :class:`~sqlalchemy.schema.SchemaItem` object such as a :class:`~sqlalchemy.schema.Table`, :class:`~sqlalchemy.schema.Column`, :class:`~sqlalchemy.schema.Index` :class:`~sqlalchemy.schema.UniqueConstraint`, or :class:`~sqlalchemy.schema.ForeignKeyConstraint` object * ``name``: the name of the object. This is typically available via ``object.name``. * ``type``: a string describing the type of object; currently ``"table"``, ``"column"``, ``"index"``, ``"unique_constraint"``, or ``"foreign_key_constraint"`` * ``reflected``: ``True`` if the given object was produced based on table reflection, ``False`` if it's from a local :class:`.MetaData` object. * ``compare_to``: the object being compared against, if available, else ``None``. E.g.:: def include_object(object, name, type_, reflected, compare_to): if (type_ == "column" and not reflected and object.info.get("skip_autogenerate", False)): return False else: return True context.configure( # ... include_object = include_object ) For the use case of omitting specific schemas from a target database when :paramref:`.EnvironmentContext.configure.include_schemas` is set to ``True``, the :attr:`~sqlalchemy.schema.Table.schema` attribute can be checked for each :class:`~sqlalchemy.schema.Table` object passed to the hook, however it is much more efficient to filter on schemas before reflection of objects takes place using the :paramref:`.EnvironmentContext.configure.include_name` hook. .. seealso:: :ref:`autogenerate_include_hooks` :paramref:`.EnvironmentContext.configure.include_name` :paramref:`.EnvironmentContext.configure.include_schemas` :param render_as_batch: if True, commands which alter elements within a table will be placed under a ``with batch_alter_table():`` directive, so that batch migrations will take place. .. seealso:: :ref:`batch_migrations` :param include_schemas: If True, autogenerate will scan across all schemas located by the SQLAlchemy :meth:`~sqlalchemy.engine.reflection.Inspector.get_schema_names` method, and include all differences in tables found across all those schemas. When using this option, you may want to also use the :paramref:`.EnvironmentContext.configure.include_name` parameter to specify a callable which can filter the tables/schemas that get included. .. seealso:: :ref:`autogenerate_include_hooks` :paramref:`.EnvironmentContext.configure.include_name` :paramref:`.EnvironmentContext.configure.include_object` :param render_item: Callable that can be used to override how any schema item, i.e. column, constraint, type, etc., is rendered for autogenerate. The callable receives a string describing the type of object, the object, and the autogen context. If it returns False, the default rendering method will be used. If it returns None, the item will not be rendered in the context of a Table construct, that is, can be used to skip columns or constraints within op.create_table():: def my_render_column(type_, col, autogen_context): if type_ == "column" and isinstance(col, MySpecialCol): return repr(col) else: return False context.configure( # ... render_item = my_render_column ) Available values for the type string include: ``"column"``, ``"primary_key"``, ``"foreign_key"``, ``"unique"``, ``"check"``, ``"type"``, ``"server_default"``. .. seealso:: :ref:`autogen_render_types` :param upgrade_token: When autogenerate completes, the text of the candidate upgrade operations will be present in this template variable when ``script.py.mako`` is rendered. Defaults to ``upgrades``. :param downgrade_token: When autogenerate completes, the text of the candidate downgrade operations will be present in this template variable when ``script.py.mako`` is rendered. Defaults to ``downgrades``. :param alembic_module_prefix: When autogenerate refers to Alembic :mod:`alembic.operations` constructs, this prefix will be used (i.e. ``op.create_table``) Defaults to "``op.``". Can be ``None`` to indicate no prefix. :param sqlalchemy_module_prefix: When autogenerate refers to SQLAlchemy :class:`~sqlalchemy.schema.Column` or type classes, this prefix will be used (i.e. ``sa.Column("somename", sa.Integer)``) Defaults to "``sa.``". Can be ``None`` to indicate no prefix. Note that when dialect-specific types are rendered, autogenerate will render them using the dialect module name, i.e. ``mssql.BIT()``, ``postgresql.UUID()``. :param user_module_prefix: When autogenerate refers to a SQLAlchemy type (e.g. :class:`.TypeEngine`) where the module name is not under the ``sqlalchemy`` namespace, this prefix will be used within autogenerate. If left at its default of ``None``, the ``__module__`` attribute of the type is used to render the import module. It's a good practice to set this and to have all custom types be available from a fixed module space, in order to future-proof migration files against reorganizations in modules. .. seealso:: :ref:`autogen_module_prefix` :param process_revision_directives: a callable function that will be passed a structure representing the end result of an autogenerate or plain "revision" operation, which can be manipulated to affect how the ``alembic revision`` command ultimately outputs new revision scripts. The structure of the callable is:: def process_revision_directives(context, revision, directives): pass The ``directives`` parameter is a Python list containing a single :class:`.MigrationScript` directive, which represents the revision file to be generated. This list as well as its contents may be freely modified to produce any set of commands. The section :ref:`customizing_revision` shows an example of doing this. The ``context`` parameter is the :class:`.MigrationContext` in use, and ``revision`` is a tuple of revision identifiers representing the current revision of the database. The callable is invoked at all times when the ``--autogenerate`` option is passed to ``alembic revision``. If ``--autogenerate`` is not passed, the callable is invoked only if the ``revision_environment`` variable is set to True in the Alembic configuration, in which case the given ``directives`` collection will contain empty :class:`.UpgradeOps` and :class:`.DowngradeOps` collections for ``.upgrade_ops`` and ``.downgrade_ops``. The ``--autogenerate`` option itself can be inferred by inspecting ``context.config.cmd_opts.autogenerate``. The callable function may optionally be an instance of a :class:`.Rewriter` object. This is a helper object that assists in the production of autogenerate-stream rewriter functions. .. seealso:: :ref:`customizing_revision` :ref:`autogen_rewriter` :paramref:`.command.revision.process_revision_directives` Parameters specific to individual backends: :param mssql_batch_separator: The "batch separator" which will be placed between each statement when generating offline SQL Server migrations. Defaults to ``GO``. Note this is in addition to the customary semicolon ``;`` at the end of each statement; SQL Server considers the "batch separator" to denote the end of an individual statement execution, and cannot group certain dependent operations in one step. :param oracle_batch_separator: The "batch separator" which will be placed between each statement when generating offline Oracle migrations. Defaults to ``/``. Oracle doesn't add a semicolon between statements like most other backends. NrO output_bufferr\rd template_argstransaction_per_migrationtarget_metadata include_nameinclude_objectinclude_schemasrender_as_batch upgrade_tokendowngrade_tokensqlalchemy_module_prefixalembic_module_prefixuser_module_prefix literal_bindsprocess_revision_directivesrg)defaulton_version_apply render_item compare_typecompare_server_defaultr6) connectionurl dialect_nameenvironment_context dialect_optsopts) r8r5rxupdaterto_tupler6r configurer4)r9rrrrrOrzrxr\rdryrr{r|r}r~rrrrrrrrrrrr:rs r;rzEnvironmentContext.configures`    ((9D$ %  $$1D ! [[ & & 2$(KK$=$=D ! #/D DK _4  ! ( ( 7,E ()"1 +^!/ "1 "1  -_"1 +C '((= $%%7 !" -_.I *+#'==1A2#N   ""-D +^ ! --CD) *X B"2"<"<!% $% # r=c |jJtj|j5|jjdi|dddy#1swYyxYw)aRun migrations as determined by the current command line configuration as well as versioning information present (or not) in the current database connection (if one is present). The function accepts optional ``**kw`` arguments. If these are passed, they are sent directly to the ``upgrade()`` and ``downgrade()`` functions within each target revision file. By modifying the ``script.py.mako`` file so that the ``upgrade()`` and ``downgrade()`` functions accept arguments, parameters can be passed here so that contextual information, usually information to identify a particular database in use, can be passed from a custom ``env.py`` script to the migration functions. This function requires that a :class:`.MigrationContext` has first been made available via :meth:`.configure`. Nrg)r4rcontextrMrun_migrations)r9r:s r;rz!EnvironmentContext.run_migrationssR(&&222    7 7 8 -D    - - 3 39 8 8s !AA!cF|jj||y)aExecute the given SQL using the current change context. The behavior of :meth:`.execute` is the same as that of :meth:`.Operations.execute`. Please see that function's documentation for full detail including caveats and limitations. This function requires that a :class:`.MigrationContext` has first been made available via :meth:`.configure`. )execution_optionsN)rMexecute)r9sqlrs r;rzEnvironmentContext.executes! ""3:K"Lr=cV|jjj|y)zEmit text directly to the "offline" SQL stream. Typically this is for emitting comments that start with --. The statement is not treated as a SQL execution, no ; or batch separator is added, etc. N)rMrN static_output)r9texts r;rz EnvironmentContext.static_outputs! --d3r=c>|jjS)a[Return a context manager that will enclose an operation within a "transaction", as defined by the environment's offline and transactional DDL settings. e.g.:: with context.begin_transaction(): context.run_migrations() :meth:`.begin_transaction` is intended to "do the right thing" regardless of calling context: * If :meth:`.is_transactional_ddl` is ``False``, returns a "do nothing" context manager which otherwise produces no transactional state or directives. * If :meth:`.is_offline_mode` is ``True``, returns a context manager that will invoke the :meth:`.DefaultImpl.emit_begin` and :meth:`.DefaultImpl.emit_commit` methods, which will produce the string directives ``BEGIN`` and ``COMMIT`` on the output stream, as rendered by the target backend (e.g. SQL Server would emit ``BEGIN TRANSACTION``). * Otherwise, calls :meth:`sqlalchemy.engine.Connection.begin` on the current online connection, which returns a :class:`sqlalchemy.engine.Transaction` object. This object demarcates a real transaction and is itself a context manager, which will roll back if an exception is raised. Note that a custom ``env.py`` script which has more specific transactional needs can of course manipulate the :class:`~sqlalchemy.engine.Connection` directly to produce transactional state in "online" mode. )rMbegin_transactionr@s r;rz$EnvironmentContext.begin_transactions\!3355r=cH|j td|jS)zReturn the current :class:`.MigrationContext` object. If :meth:`.EnvironmentContext.configure` has not been called yet, raises an exception. z#No context has been configured yet.)r4 Exceptionr@s r;rMzEnvironmentContext.get_contexts(  " " *AB B&&&r=c6|jjS)a3Return the current 'bind'. In "online" mode, this is the :class:`sqlalchemy.engine.Connection` currently being used to emit SQL to the database. This function requires that a :class:`.MigrationContext` has first been made available via :meth:`.configure`. )rMbindr@s r;get_bindzEnvironmentContext.get_bind s!&&&r=c6|jjSrC)rMrNr@s r;get_implzEnvironmentContext.get_impls!&&&r=)r5r%r6r(r:rreturnNone)rr3)rErr:rrr)rbool)r _RevNumber)r Optional[str])rizLiteral[False]rz List[str])riz Literal[True]rzDict[str, str]).)rirrz Union[List[str], Dict[str, str]])F)NNNNNFNNNNFNNNFNTFNFupgrades downgradeszop.zsa.NN)8rzOptional[Connection]rzOptional[Union[str, URL]]rrrOptional[Dict[str, Any]]rOzOptional[bool]rzrrxzOptional[TextIO]r\rrdrryrrrr{z)Union[MetaData, Sequence[MetaData], None]r|zOptional[IncludeNameFn]r}zOptional[IncludeObjectFn]r~rrz$Optional[ProcessRevisionDirectiveFn]rzUnion[bool, CompareType]rz!Union[bool, CompareServerDefault]rzOptional[RenderItemFn]rrrstrrrrrrrrrrzOptional[OnVersionApplyFn]r:rrr)r:rrrrC)rzUnion[Executable, str]rrrr)rrrr)rz>Union[_ProxyTransaction, ContextManager[None, Optional[bool]]])rr)rr)rr&)__name__ __module__ __qualname____doc__r4__annotations__r5r6r<rArFrKrPrRrWrZr_rbrer rjrrrrrrMrrrgr=r;r3r3ns=~6:29FF'#FO" &5=@   6 9* 6 7: $2(MM * $'/!/ )// %*8!8 )8x,0)-&*15,0*/*.&*!26 %EI0448 % 15DI.2#'+%*(-,07;;| (| '| $ | / | * | $(| (| $| | 0| | C| .| 2| !| "& #| (/)| *!B+| ,,-| ./| 01| 23| 4 #5| 6#&7| 8*9| :5;| <=| > ?| |467;M #M4M  M$ 4.6 G.6` ' ''r=r3)C __future__rtypingrrrrrr r r r r rrrrsqlalchemy.sql.schemarrtyping_extensionsrr migrationrrr operationsrscript.revisionrsqlalchemy.enginersqlalchemy.engine.basersqlalchemy.sqlrr r!sqlalchemy.sql.type_apir"r#autogenerate.apir$r5r%ddlr&operations.opsr' script.baser(rrProcessRevisionDirectiveFn RenderItemFnNameFilterTypeNameFilterParentNamesr IncludeNameFnIncludeObjectFnOnVersionApplyFnCompareServerDefault CompareTypeModuleClsProxyr3rgr=r;rs"! (.,%('#(%1).02(1!0- eCsCx01 2 %z4(9#:;TA# %WU^(;"<< ' FG SM c]N$9:D@        3c9JK      TN   TN  m',,m'r=PK!sԧ$__pycache__/__init__.cpython-312.pycnu[ Qgy)NrG/opt/hc_python/lib/python3.12/site-packages/alembic/runtime/__init__.pyrsrPK!m migration.pynu[# mypy: allow-untyped-defs, allow-incomplete-defs, allow-untyped-calls # mypy: no-warn-return-any, allow-any-generics from __future__ import annotations from contextlib import contextmanager from contextlib import nullcontext import logging import sys from typing import Any from typing import Callable from typing import cast from typing import Collection from typing import Dict from typing import Iterable from typing import Iterator from typing import List from typing import Optional from typing import Set from typing import Tuple from typing import TYPE_CHECKING from typing import Union from sqlalchemy import Column from sqlalchemy import literal_column from sqlalchemy import select from sqlalchemy.engine import Engine from sqlalchemy.engine import url as sqla_url from sqlalchemy.engine.strategies import MockEngineStrategy from typing_extensions import ContextManager from .. import ddl from .. import util from ..util import sqla_compat from ..util.compat import EncodedIO if TYPE_CHECKING: from sqlalchemy.engine import Dialect from sqlalchemy.engine import URL from sqlalchemy.engine.base import Connection from sqlalchemy.engine.base import Transaction from sqlalchemy.engine.mock import MockConnection from sqlalchemy.sql import Executable from .environment import EnvironmentContext from ..config import Config from ..script.base import Script from ..script.base import ScriptDirectory from ..script.revision import _RevisionOrBase from ..script.revision import Revision from ..script.revision import RevisionMap log = logging.getLogger(__name__) class _ProxyTransaction: def __init__(self, migration_context: MigrationContext) -> None: self.migration_context = migration_context @property def _proxied_transaction(self) -> Optional[Transaction]: return self.migration_context._transaction def rollback(self) -> None: t = self._proxied_transaction assert t is not None t.rollback() self.migration_context._transaction = None def commit(self) -> None: t = self._proxied_transaction assert t is not None t.commit() self.migration_context._transaction = None def __enter__(self) -> _ProxyTransaction: return self def __exit__(self, type_: Any, value: Any, traceback: Any) -> None: if self._proxied_transaction is not None: self._proxied_transaction.__exit__(type_, value, traceback) self.migration_context._transaction = None class MigrationContext: """Represent the database state made available to a migration script. :class:`.MigrationContext` is the front end to an actual database connection, or alternatively a string output stream given a particular database dialect, from an Alembic perspective. When inside the ``env.py`` script, the :class:`.MigrationContext` is available via the :meth:`.EnvironmentContext.get_context` method, which is available at ``alembic.context``:: # from within env.py script from alembic import context migration_context = context.get_context() For usage outside of an ``env.py`` script, such as for utility routines that want to check the current version in the database, the :meth:`.MigrationContext.configure` method to create new :class:`.MigrationContext` objects. For example, to get at the current revision in the database using :meth:`.MigrationContext.get_current_revision`:: # in any application, outside of an env.py script from alembic.migration import MigrationContext from sqlalchemy import create_engine engine = create_engine("postgresql://mydatabase") conn = engine.connect() context = MigrationContext.configure(conn) current_rev = context.get_current_revision() The above context can also be used to produce Alembic migration operations with an :class:`.Operations` instance:: # in any application, outside of the normal Alembic environment from alembic.operations import Operations op = Operations(context) op.alter_column("mytable", "somecolumn", nullable=True) """ def __init__( self, dialect: Dialect, connection: Optional[Connection], opts: Dict[str, Any], environment_context: Optional[EnvironmentContext] = None, ) -> None: self.environment_context = environment_context self.opts = opts self.dialect = dialect self.script: Optional[ScriptDirectory] = opts.get("script") as_sql: bool = opts.get("as_sql", False) transactional_ddl = opts.get("transactional_ddl") self._transaction_per_migration = opts.get( "transaction_per_migration", False ) self.on_version_apply_callbacks = opts.get("on_version_apply", ()) self._transaction: Optional[Transaction] = None if as_sql: self.connection = cast( Optional["Connection"], self._stdout_connection(connection) ) assert self.connection is not None self._in_external_transaction = False else: self.connection = connection self._in_external_transaction = ( sqla_compat._get_connection_in_transaction(connection) ) self._migrations_fn: Optional[ Callable[..., Iterable[RevisionStep]] ] = opts.get("fn") self.as_sql = as_sql self.purge = opts.get("purge", False) if "output_encoding" in opts: self.output_buffer = EncodedIO( opts.get("output_buffer") or sys.stdout, # type:ignore[arg-type] opts["output_encoding"], ) else: self.output_buffer = opts.get("output_buffer", sys.stdout) self._user_compare_type = opts.get("compare_type", True) self._user_compare_server_default = opts.get( "compare_server_default", False ) self.version_table = version_table = opts.get( "version_table", "alembic_version" ) self.version_table_schema = version_table_schema = opts.get( "version_table_schema", None ) self._start_from_rev: Optional[str] = opts.get("starting_rev") self.impl = ddl.DefaultImpl.get_by_dialect(dialect)( dialect, self.connection, self.as_sql, transactional_ddl, self.output_buffer, opts, ) self._version = self.impl.version_table_impl( version_table=version_table, version_table_schema=version_table_schema, version_table_pk=opts.get("version_table_pk", True), ) log.info("Context impl %s.", self.impl.__class__.__name__) if self.as_sql: log.info("Generating static SQL") log.info( "Will assume %s DDL.", ( "transactional" if self.impl.transactional_ddl else "non-transactional" ), ) @classmethod def configure( cls, connection: Optional[Connection] = None, url: Optional[Union[str, URL]] = None, dialect_name: Optional[str] = None, dialect: Optional[Dialect] = None, environment_context: Optional[EnvironmentContext] = None, dialect_opts: Optional[Dict[str, str]] = None, opts: Optional[Any] = None, ) -> MigrationContext: """Create a new :class:`.MigrationContext`. This is a factory method usually called by :meth:`.EnvironmentContext.configure`. :param connection: a :class:`~sqlalchemy.engine.Connection` to use for SQL execution in "online" mode. When present, is also used to determine the type of dialect in use. :param url: a string database url, or a :class:`sqlalchemy.engine.url.URL` object. The type of dialect to be used will be derived from this if ``connection`` is not passed. :param dialect_name: string name of a dialect, such as "postgresql", "mssql", etc. The type of dialect to be used will be derived from this if ``connection`` and ``url`` are not passed. :param opts: dictionary of options. Most other options accepted by :meth:`.EnvironmentContext.configure` are passed via this dictionary. """ if opts is None: opts = {} if dialect_opts is None: dialect_opts = {} if connection: if isinstance(connection, Engine): raise util.CommandError( "'connection' argument to configure() is expected " "to be a sqlalchemy.engine.Connection instance, " "got %r" % connection, ) dialect = connection.dialect elif url: url_obj = sqla_url.make_url(url) dialect = url_obj.get_dialect()(**dialect_opts) elif dialect_name: url_obj = sqla_url.make_url("%s://" % dialect_name) dialect = url_obj.get_dialect()(**dialect_opts) elif not dialect: raise Exception("Connection, url, or dialect_name is required.") assert dialect is not None return MigrationContext(dialect, connection, opts, environment_context) @contextmanager def autocommit_block(self) -> Iterator[None]: """Enter an "autocommit" block, for databases that support AUTOCOMMIT isolation levels. This special directive is intended to support the occasional database DDL or system operation that specifically has to be run outside of any kind of transaction block. The PostgreSQL database platform is the most common target for this style of operation, as many of its DDL operations must be run outside of transaction blocks, even though the database overall supports transactional DDL. The method is used as a context manager within a migration script, by calling on :meth:`.Operations.get_context` to retrieve the :class:`.MigrationContext`, then invoking :meth:`.MigrationContext.autocommit_block` using the ``with:`` statement:: def upgrade(): with op.get_context().autocommit_block(): op.execute("ALTER TYPE mood ADD VALUE 'soso'") Above, a PostgreSQL "ALTER TYPE..ADD VALUE" directive is emitted, which must be run outside of a transaction block at the database level. The :meth:`.MigrationContext.autocommit_block` method makes use of the SQLAlchemy ``AUTOCOMMIT`` isolation level setting, which against the psycogp2 DBAPI corresponds to the ``connection.autocommit`` setting, to ensure that the database driver is not inside of a DBAPI level transaction block. .. warning:: As is necessary, **the database transaction preceding the block is unconditionally committed**. This means that the run of migrations preceding the operation will be committed, before the overall migration operation is complete. It is recommended that when an application includes migrations with "autocommit" blocks, that :paramref:`.EnvironmentContext.transaction_per_migration` be used so that the calling environment is tuned to expect short per-file migrations whether or not one of them has an autocommit block. """ _in_connection_transaction = self._in_connection_transaction() if self.impl.transactional_ddl and self.as_sql: self.impl.emit_commit() elif _in_connection_transaction: assert self._transaction is not None self._transaction.commit() self._transaction = None if not self.as_sql: assert self.connection is not None current_level = self.connection.get_isolation_level() base_connection = self.connection # in 1.3 and 1.4 non-future mode, the connection gets switched # out. we can use the base connection with the new mode # except that it will not know it's in "autocommit" and will # emit deprecation warnings when an autocommit action takes # place. self.connection = self.impl.connection = ( base_connection.execution_options(isolation_level="AUTOCOMMIT") ) # sqlalchemy future mode will "autobegin" in any case, so take # control of that "transaction" here fake_trans: Optional[Transaction] = self.connection.begin() else: fake_trans = None try: yield finally: if not self.as_sql: assert self.connection is not None if fake_trans is not None: fake_trans.commit() self.connection.execution_options( isolation_level=current_level ) self.connection = self.impl.connection = base_connection if self.impl.transactional_ddl and self.as_sql: self.impl.emit_begin() elif _in_connection_transaction: assert self.connection is not None self._transaction = self.connection.begin() def begin_transaction( self, _per_migration: bool = False ) -> Union[_ProxyTransaction, ContextManager[None, Optional[bool]]]: """Begin a logical transaction for migration operations. This method is used within an ``env.py`` script to demarcate where the outer "transaction" for a series of migrations begins. Example:: def run_migrations_online(): connectable = create_engine(...) with connectable.connect() as connection: context.configure( connection=connection, target_metadata=target_metadata ) with context.begin_transaction(): context.run_migrations() Above, :meth:`.MigrationContext.begin_transaction` is used to demarcate where the outer logical transaction occurs around the :meth:`.MigrationContext.run_migrations` operation. A "Logical" transaction means that the operation may or may not correspond to a real database transaction. If the target database supports transactional DDL (or :paramref:`.EnvironmentContext.configure.transactional_ddl` is true), the :paramref:`.EnvironmentContext.configure.transaction_per_migration` flag is not set, and the migration is against a real database connection (as opposed to using "offline" ``--sql`` mode), a real transaction will be started. If ``--sql`` mode is in effect, the operation would instead correspond to a string such as "BEGIN" being emitted to the string output. The returned object is a Python context manager that should only be used in the context of a ``with:`` statement as indicated above. The object has no other guaranteed API features present. .. seealso:: :meth:`.MigrationContext.autocommit_block` """ if self._in_external_transaction: return nullcontext() if self.impl.transactional_ddl: transaction_now = _per_migration == self._transaction_per_migration else: transaction_now = _per_migration is True if not transaction_now: return nullcontext() elif not self.impl.transactional_ddl: assert _per_migration if self.as_sql: return nullcontext() else: # track our own notion of a "transaction block", which must be # committed when complete. Don't rely upon whether or not the # SQLAlchemy connection reports as "in transaction"; this # because SQLAlchemy future connection features autobegin # behavior, so it may already be in a transaction from our # emitting of queries like "has_version_table", etc. While we # could track these operations as well, that leaves open the # possibility of new operations or other things happening in # the user environment that still may be triggering # "autobegin". in_transaction = self._transaction is not None if in_transaction: return nullcontext() else: assert self.connection is not None self._transaction = ( sqla_compat._safe_begin_connection_transaction( self.connection ) ) return _ProxyTransaction(self) elif self.as_sql: @contextmanager def begin_commit(): self.impl.emit_begin() yield self.impl.emit_commit() return begin_commit() else: assert self.connection is not None self._transaction = sqla_compat._safe_begin_connection_transaction( self.connection ) return _ProxyTransaction(self) def get_current_revision(self) -> Optional[str]: """Return the current revision, usually that which is present in the ``alembic_version`` table in the database. This method intends to be used only for a migration stream that does not contain unmerged branches in the target database; if there are multiple branches present, an exception is raised. The :meth:`.MigrationContext.get_current_heads` should be preferred over this method going forward in order to be compatible with branch migration support. If this :class:`.MigrationContext` was configured in "offline" mode, that is with ``as_sql=True``, the ``starting_rev`` parameter is returned instead, if any. """ heads = self.get_current_heads() if len(heads) == 0: return None elif len(heads) > 1: raise util.CommandError( "Version table '%s' has more than one head present; " "please use get_current_heads()" % self.version_table ) else: return heads[0] def get_current_heads(self) -> Tuple[str, ...]: """Return a tuple of the current 'head versions' that are represented in the target database. For a migration stream without branches, this will be a single value, synonymous with that of :meth:`.MigrationContext.get_current_revision`. However when multiple unmerged branches exist within the target database, the returned tuple will contain a value for each head. If this :class:`.MigrationContext` was configured in "offline" mode, that is with ``as_sql=True``, the ``starting_rev`` parameter is returned in a one-length tuple. If no version table is present, or if there are no revisions present, an empty tuple is returned. """ if self.as_sql: start_from_rev: Any = self._start_from_rev if start_from_rev == "base": start_from_rev = None elif start_from_rev is not None and self.script: start_from_rev = [ self.script.get_revision(sfr).revision for sfr in util.to_list(start_from_rev) if sfr not in (None, "base") ] return util.to_tuple(start_from_rev, default=()) else: if self._start_from_rev: raise util.CommandError( "Can't specify current_rev to context " "when using a database connection" ) if not self._has_version_table(): return () assert self.connection is not None return tuple( row[0] for row in self.connection.execute( select(self._version.c.version_num) ) ) def _ensure_version_table(self, purge: bool = False) -> None: with sqla_compat._ensure_scope_for_ddl(self.connection): assert self.connection is not None self._version.create(self.connection, checkfirst=True) if purge: assert self.connection is not None self.connection.execute(self._version.delete()) def _has_version_table(self) -> bool: assert self.connection is not None return sqla_compat._connectable_has_table( self.connection, self.version_table, self.version_table_schema ) def stamp(self, script_directory: ScriptDirectory, revision: str) -> None: """Stamp the version table with a specific revision. This method calculates those branches to which the given revision can apply, and updates those branches as though they were migrated towards that revision (either up or down). If no current branches include the revision, it is added as a new branch head. """ heads = self.get_current_heads() if not self.as_sql and not heads: self._ensure_version_table() head_maintainer = HeadMaintainer(self, heads) for step in script_directory._stamp_revs(revision, heads): head_maintainer.update_to_step(step) def run_migrations(self, **kw: Any) -> None: r"""Run the migration scripts established for this :class:`.MigrationContext`, if any. The commands in :mod:`alembic.command` will set up a function that is ultimately passed to the :class:`.MigrationContext` as the ``fn`` argument. This function represents the "work" that will be done when :meth:`.MigrationContext.run_migrations` is called, typically from within the ``env.py`` script of the migration environment. The "work function" then provides an iterable of version callables and other version information which in the case of the ``upgrade`` or ``downgrade`` commands are the list of version scripts to invoke. Other commands yield nothing, in the case that a command wants to run some other operation against the database such as the ``current`` or ``stamp`` commands. :param \**kw: keyword arguments here will be passed to each migration callable, that is the ``upgrade()`` or ``downgrade()`` method within revision scripts. """ self.impl.start_migrations() heads: Tuple[str, ...] if self.purge: if self.as_sql: raise util.CommandError("Can't use --purge with --sql mode") self._ensure_version_table(purge=True) heads = () else: heads = self.get_current_heads() dont_mutate = self.opts.get("dont_mutate", False) if not self.as_sql and not heads and not dont_mutate: self._ensure_version_table() head_maintainer = HeadMaintainer(self, heads) assert self._migrations_fn is not None for step in self._migrations_fn(heads, self): with self.begin_transaction(_per_migration=True): if self.as_sql and not head_maintainer.heads: # for offline mode, include a CREATE TABLE from # the base assert self.connection is not None self._version.create(self.connection) log.info("Running %s", step) if self.as_sql: self.impl.static_output( "-- Running %s" % (step.short_log,) ) step.migration_fn(**kw) # previously, we wouldn't stamp per migration # if we were in a transaction, however given the more # complex model that involves any number of inserts # and row-targeted updates and deletes, it's simpler for now # just to run the operations on every version head_maintainer.update_to_step(step) for callback in self.on_version_apply_callbacks: callback( ctx=self, step=step.info, heads=set(head_maintainer.heads), run_args=kw, ) if self.as_sql and not head_maintainer.heads: assert self.connection is not None self._version.drop(self.connection) def _in_connection_transaction(self) -> bool: try: meth = self.connection.in_transaction # type:ignore[union-attr] except AttributeError: return False else: return meth() def execute( self, sql: Union[Executable, str], execution_options: Optional[Dict[str, Any]] = None, ) -> None: """Execute a SQL construct or string statement. The underlying execution mechanics are used, that is if this is "offline mode" the SQL is written to the output buffer, otherwise the SQL is emitted on the current SQLAlchemy connection. """ self.impl._exec(sql, execution_options) def _stdout_connection( self, connection: Optional[Connection] ) -> MockConnection: def dump(construct, *multiparams, **params): self.impl._exec(construct) return MockEngineStrategy.MockConnection(self.dialect, dump) @property def bind(self) -> Optional[Connection]: """Return the current "bind". In online mode, this is an instance of :class:`sqlalchemy.engine.Connection`, and is suitable for ad-hoc execution of any kind of usage described in SQLAlchemy Core documentation as well as for usage with the :meth:`sqlalchemy.schema.Table.create` and :meth:`sqlalchemy.schema.MetaData.create_all` methods of :class:`~sqlalchemy.schema.Table`, :class:`~sqlalchemy.schema.MetaData`. Note that when "standard output" mode is enabled, this bind will be a "mock" connection handler that cannot return results and is only appropriate for a very limited subset of commands. """ return self.connection @property def config(self) -> Optional[Config]: """Return the :class:`.Config` used by the current environment, if any.""" if self.environment_context: return self.environment_context.config else: return None def _compare_type( self, inspector_column: Column[Any], metadata_column: Column ) -> bool: if self._user_compare_type is False: return False if callable(self._user_compare_type): user_value = self._user_compare_type( self, inspector_column, metadata_column, inspector_column.type, metadata_column.type, ) if user_value is not None: return user_value return self.impl.compare_type(inspector_column, metadata_column) def _compare_server_default( self, inspector_column: Column[Any], metadata_column: Column[Any], rendered_metadata_default: Optional[str], rendered_column_default: Optional[str], ) -> bool: if self._user_compare_server_default is False: return False if callable(self._user_compare_server_default): user_value = self._user_compare_server_default( self, inspector_column, metadata_column, rendered_column_default, metadata_column.server_default, rendered_metadata_default, ) if user_value is not None: return user_value return self.impl.compare_server_default( inspector_column, metadata_column, rendered_metadata_default, rendered_column_default, ) class HeadMaintainer: def __init__(self, context: MigrationContext, heads: Any) -> None: self.context = context self.heads = set(heads) def _insert_version(self, version: str) -> None: assert version not in self.heads self.heads.add(version) self.context.impl._exec( self.context._version.insert().values( version_num=literal_column("'%s'" % version) ) ) def _delete_version(self, version: str) -> None: self.heads.remove(version) ret = self.context.impl._exec( self.context._version.delete().where( self.context._version.c.version_num == literal_column("'%s'" % version) ) ) if ( not self.context.as_sql and self.context.dialect.supports_sane_rowcount and ret is not None and ret.rowcount != 1 ): raise util.CommandError( "Online migration expected to match one " "row when deleting '%s' in '%s'; " "%d found" % (version, self.context.version_table, ret.rowcount) ) def _update_version(self, from_: str, to_: str) -> None: assert to_ not in self.heads self.heads.remove(from_) self.heads.add(to_) ret = self.context.impl._exec( self.context._version.update() .values(version_num=literal_column("'%s'" % to_)) .where( self.context._version.c.version_num == literal_column("'%s'" % from_) ) ) if ( not self.context.as_sql and self.context.dialect.supports_sane_rowcount and ret is not None and ret.rowcount != 1 ): raise util.CommandError( "Online migration expected to match one " "row when updating '%s' to '%s' in '%s'; " "%d found" % (from_, to_, self.context.version_table, ret.rowcount) ) def update_to_step(self, step: Union[RevisionStep, StampStep]) -> None: if step.should_delete_branch(self.heads): vers = step.delete_version_num log.debug("branch delete %s", vers) self._delete_version(vers) elif step.should_create_branch(self.heads): vers = step.insert_version_num log.debug("new branch insert %s", vers) self._insert_version(vers) elif step.should_merge_branches(self.heads): # delete revs, update from rev, update to rev ( delete_revs, update_from_rev, update_to_rev, ) = step.merge_branch_idents(self.heads) log.debug( "merge, delete %s, update %s to %s", delete_revs, update_from_rev, update_to_rev, ) for delrev in delete_revs: self._delete_version(delrev) self._update_version(update_from_rev, update_to_rev) elif step.should_unmerge_branches(self.heads): ( update_from_rev, update_to_rev, insert_revs, ) = step.unmerge_branch_idents(self.heads) log.debug( "unmerge, insert %s, update %s to %s", insert_revs, update_from_rev, update_to_rev, ) for insrev in insert_revs: self._insert_version(insrev) self._update_version(update_from_rev, update_to_rev) else: from_, to_ = step.update_version_num(self.heads) log.debug("update %s to %s", from_, to_) self._update_version(from_, to_) class MigrationInfo: """Exposes information about a migration step to a callback listener. The :class:`.MigrationInfo` object is available exclusively for the benefit of the :paramref:`.EnvironmentContext.on_version_apply` callback hook. """ is_upgrade: bool """True/False: indicates whether this operation ascends or descends the version tree.""" is_stamp: bool """True/False: indicates whether this operation is a stamp (i.e. whether it results in any actual database operations).""" up_revision_id: Optional[str] """Version string corresponding to :attr:`.Revision.revision`. In the case of a stamp operation, it is advised to use the :attr:`.MigrationInfo.up_revision_ids` tuple as a stamp operation can make a single movement from one or more branches down to a single branchpoint, in which case there will be multiple "up" revisions. .. seealso:: :attr:`.MigrationInfo.up_revision_ids` """ up_revision_ids: Tuple[str, ...] """Tuple of version strings corresponding to :attr:`.Revision.revision`. In the majority of cases, this tuple will be a single value, synonymous with the scalar value of :attr:`.MigrationInfo.up_revision_id`. It can be multiple revision identifiers only in the case of an ``alembic stamp`` operation which is moving downwards from multiple branches down to their common branch point. """ down_revision_ids: Tuple[str, ...] """Tuple of strings representing the base revisions of this migration step. If empty, this represents a root revision; otherwise, the first item corresponds to :attr:`.Revision.down_revision`, and the rest are inferred from dependencies. """ revision_map: RevisionMap """The revision map inside of which this operation occurs.""" def __init__( self, revision_map: RevisionMap, is_upgrade: bool, is_stamp: bool, up_revisions: Union[str, Tuple[str, ...]], down_revisions: Union[str, Tuple[str, ...]], ) -> None: self.revision_map = revision_map self.is_upgrade = is_upgrade self.is_stamp = is_stamp self.up_revision_ids = util.to_tuple(up_revisions, default=()) if self.up_revision_ids: self.up_revision_id = self.up_revision_ids[0] else: # this should never be the case with # "upgrade", "downgrade", or "stamp" as we are always # measuring movement in terms of at least one upgrade version self.up_revision_id = None self.down_revision_ids = util.to_tuple(down_revisions, default=()) @property def is_migration(self) -> bool: """True/False: indicates whether this operation is a migration. At present this is true if and only the migration is not a stamp. If other operation types are added in the future, both this attribute and :attr:`~.MigrationInfo.is_stamp` will be false. """ return not self.is_stamp @property def source_revision_ids(self) -> Tuple[str, ...]: """Active revisions before this migration step is applied.""" return ( self.down_revision_ids if self.is_upgrade else self.up_revision_ids ) @property def destination_revision_ids(self) -> Tuple[str, ...]: """Active revisions after this migration step is applied.""" return ( self.up_revision_ids if self.is_upgrade else self.down_revision_ids ) @property def up_revision(self) -> Optional[Revision]: """Get :attr:`~.MigrationInfo.up_revision_id` as a :class:`.Revision`. """ return self.revision_map.get_revision(self.up_revision_id) @property def up_revisions(self) -> Tuple[Optional[_RevisionOrBase], ...]: """Get :attr:`~.MigrationInfo.up_revision_ids` as a :class:`.Revision`.""" return self.revision_map.get_revisions(self.up_revision_ids) @property def down_revisions(self) -> Tuple[Optional[_RevisionOrBase], ...]: """Get :attr:`~.MigrationInfo.down_revision_ids` as a tuple of :class:`Revisions <.Revision>`.""" return self.revision_map.get_revisions(self.down_revision_ids) @property def source_revisions(self) -> Tuple[Optional[_RevisionOrBase], ...]: """Get :attr:`~MigrationInfo.source_revision_ids` as a tuple of :class:`Revisions <.Revision>`.""" return self.revision_map.get_revisions(self.source_revision_ids) @property def destination_revisions(self) -> Tuple[Optional[_RevisionOrBase], ...]: """Get :attr:`~MigrationInfo.destination_revision_ids` as a tuple of :class:`Revisions <.Revision>`.""" return self.revision_map.get_revisions(self.destination_revision_ids) class MigrationStep: from_revisions_no_deps: Tuple[str, ...] to_revisions_no_deps: Tuple[str, ...] is_upgrade: bool migration_fn: Any if TYPE_CHECKING: @property def doc(self) -> Optional[str]: ... @property def name(self) -> str: return self.migration_fn.__name__ @classmethod def upgrade_from_script( cls, revision_map: RevisionMap, script: Script ) -> RevisionStep: return RevisionStep(revision_map, script, True) @classmethod def downgrade_from_script( cls, revision_map: RevisionMap, script: Script ) -> RevisionStep: return RevisionStep(revision_map, script, False) @property def is_downgrade(self) -> bool: return not self.is_upgrade @property def short_log(self) -> str: return "%s %s -> %s" % ( self.name, util.format_as_comma(self.from_revisions_no_deps), util.format_as_comma(self.to_revisions_no_deps), ) def __str__(self): if self.doc: return "%s %s -> %s, %s" % ( self.name, util.format_as_comma(self.from_revisions_no_deps), util.format_as_comma(self.to_revisions_no_deps), self.doc, ) else: return self.short_log class RevisionStep(MigrationStep): def __init__( self, revision_map: RevisionMap, revision: Script, is_upgrade: bool ) -> None: self.revision_map = revision_map self.revision = revision self.is_upgrade = is_upgrade if is_upgrade: self.migration_fn = revision.module.upgrade else: self.migration_fn = revision.module.downgrade def __repr__(self): return "RevisionStep(%r, is_upgrade=%r)" % ( self.revision.revision, self.is_upgrade, ) def __eq__(self, other: object) -> bool: return ( isinstance(other, RevisionStep) and other.revision == self.revision and self.is_upgrade == other.is_upgrade ) @property def doc(self) -> Optional[str]: return self.revision.doc @property def from_revisions(self) -> Tuple[str, ...]: if self.is_upgrade: return self.revision._normalized_down_revisions else: return (self.revision.revision,) @property def from_revisions_no_deps( # type:ignore[override] self, ) -> Tuple[str, ...]: if self.is_upgrade: return self.revision._versioned_down_revisions else: return (self.revision.revision,) @property def to_revisions(self) -> Tuple[str, ...]: if self.is_upgrade: return (self.revision.revision,) else: return self.revision._normalized_down_revisions @property def to_revisions_no_deps( # type:ignore[override] self, ) -> Tuple[str, ...]: if self.is_upgrade: return (self.revision.revision,) else: return self.revision._versioned_down_revisions @property def _has_scalar_down_revision(self) -> bool: return len(self.revision._normalized_down_revisions) == 1 def should_delete_branch(self, heads: Set[str]) -> bool: """A delete is when we are a. in a downgrade and b. we are going to the "base" or we are going to a version that is implied as a dependency on another version that is remaining. """ if not self.is_downgrade: return False if self.revision.revision not in heads: return False downrevs = self.revision._normalized_down_revisions if not downrevs: # is a base return True else: # determine what the ultimate "to_revisions" for an # unmerge would be. If there are none, then we're a delete. to_revisions = self._unmerge_to_revisions(heads) return not to_revisions def merge_branch_idents( self, heads: Set[str] ) -> Tuple[List[str], str, str]: other_heads = set(heads).difference(self.from_revisions) if other_heads: ancestors = { r.revision for r in self.revision_map._get_ancestor_nodes( self.revision_map.get_revisions(other_heads), check=False ) } from_revisions = list( set(self.from_revisions).difference(ancestors) ) else: from_revisions = list(self.from_revisions) return ( # delete revs, update from rev, update to rev list(from_revisions[0:-1]), from_revisions[-1], self.to_revisions[0], ) def _unmerge_to_revisions(self, heads: Set[str]) -> Tuple[str, ...]: other_heads = set(heads).difference([self.revision.revision]) if other_heads: ancestors = { r.revision for r in self.revision_map._get_ancestor_nodes( self.revision_map.get_revisions(other_heads), check=False ) } return tuple(set(self.to_revisions).difference(ancestors)) else: # for each revision we plan to return, compute its ancestors # (excluding self), and remove those from the final output since # they are already accounted for. ancestors = { r.revision for to_revision in self.to_revisions for r in self.revision_map._get_ancestor_nodes( self.revision_map.get_revisions(to_revision), check=False ) if r.revision != to_revision } return tuple(set(self.to_revisions).difference(ancestors)) def unmerge_branch_idents( self, heads: Set[str] ) -> Tuple[str, str, Tuple[str, ...]]: to_revisions = self._unmerge_to_revisions(heads) return ( # update from rev, update to rev, insert revs self.from_revisions[0], to_revisions[-1], to_revisions[0:-1], ) def should_create_branch(self, heads: Set[str]) -> bool: if not self.is_upgrade: return False downrevs = self.revision._normalized_down_revisions if not downrevs: # is a base return True else: # none of our downrevs are present, so... # we have to insert our version. This is true whether # or not there is only one downrev, or multiple (in the latter # case, we're a merge point.) if not heads.intersection(downrevs): return True else: return False def should_merge_branches(self, heads: Set[str]) -> bool: if not self.is_upgrade: return False downrevs = self.revision._normalized_down_revisions if len(downrevs) > 1 and len(heads.intersection(downrevs)) > 1: return True return False def should_unmerge_branches(self, heads: Set[str]) -> bool: if not self.is_downgrade: return False downrevs = self.revision._normalized_down_revisions if self.revision.revision in heads and len(downrevs) > 1: return True return False def update_version_num(self, heads: Set[str]) -> Tuple[str, str]: if not self._has_scalar_down_revision: downrev = heads.intersection( self.revision._normalized_down_revisions ) assert ( len(downrev) == 1 ), "Can't do an UPDATE because downrevision is ambiguous" down_revision = list(downrev)[0] else: down_revision = self.revision._normalized_down_revisions[0] if self.is_upgrade: return down_revision, self.revision.revision else: return self.revision.revision, down_revision @property def delete_version_num(self) -> str: return self.revision.revision @property def insert_version_num(self) -> str: return self.revision.revision @property def info(self) -> MigrationInfo: return MigrationInfo( revision_map=self.revision_map, up_revisions=self.revision.revision, down_revisions=self.revision._normalized_down_revisions, is_upgrade=self.is_upgrade, is_stamp=False, ) class StampStep(MigrationStep): def __init__( self, from_: Optional[Union[str, Collection[str]]], to_: Optional[Union[str, Collection[str]]], is_upgrade: bool, branch_move: bool, revision_map: Optional[RevisionMap] = None, ) -> None: self.from_: Tuple[str, ...] = util.to_tuple(from_, default=()) self.to_: Tuple[str, ...] = util.to_tuple(to_, default=()) self.is_upgrade = is_upgrade self.branch_move = branch_move self.migration_fn = self.stamp_revision self.revision_map = revision_map doc: Optional[str] = None def stamp_revision(self, **kw: Any) -> None: return None def __eq__(self, other): return ( isinstance(other, StampStep) and other.from_revisions == self.from_revisions and other.to_revisions == self.to_revisions and other.branch_move == self.branch_move and self.is_upgrade == other.is_upgrade ) @property def from_revisions(self): return self.from_ @property def to_revisions(self) -> Tuple[str, ...]: return self.to_ @property def from_revisions_no_deps( # type:ignore[override] self, ) -> Tuple[str, ...]: return self.from_ @property def to_revisions_no_deps( # type:ignore[override] self, ) -> Tuple[str, ...]: return self.to_ @property def delete_version_num(self) -> str: assert len(self.from_) == 1 return self.from_[0] @property def insert_version_num(self) -> str: assert len(self.to_) == 1 return self.to_[0] def update_version_num(self, heads: Set[str]) -> Tuple[str, str]: assert len(self.from_) == 1 assert len(self.to_) == 1 return self.from_[0], self.to_[0] def merge_branch_idents( self, heads: Union[Set[str], List[str]] ) -> Union[Tuple[List[Any], str, str], Tuple[List[str], str, str]]: return ( # delete revs, update from rev, update to rev list(self.from_[0:-1]), self.from_[-1], self.to_[0], ) def unmerge_branch_idents( self, heads: Set[str] ) -> Tuple[str, str, List[str]]: return ( # update from rev, update to rev, insert revs self.from_[0], self.to_[-1], list(self.to_[0:-1]), ) def should_delete_branch(self, heads: Set[str]) -> bool: # TODO: we probably need to look for self.to_ inside of heads, # in a similar manner as should_create_branch, however we have # no tests for this yet (stamp downgrades w/ branches) return self.is_downgrade and self.branch_move def should_create_branch(self, heads: Set[str]) -> Union[Set[str], bool]: return ( self.is_upgrade and (self.branch_move or set(self.from_).difference(heads)) and set(self.to_).difference(heads) ) def should_merge_branches(self, heads: Set[str]) -> bool: return len(self.from_) > 1 def should_unmerge_branches(self, heads: Set[str]) -> bool: return len(self.to_) > 1 @property def info(self) -> MigrationInfo: up, down = ( (self.to_, self.from_) if self.is_upgrade else (self.from_, self.to_) ) assert self.revision_map is not None return MigrationInfo( revision_map=self.revision_map, up_revisions=up, down_revisions=down, is_upgrade=self.is_upgrade, is_stamp=True, ) PK!44environment.pynu[from __future__ import annotations from typing import Any from typing import Callable from typing import Collection from typing import Dict from typing import List from typing import Mapping from typing import MutableMapping from typing import Optional from typing import overload from typing import Sequence from typing import TextIO from typing import Tuple from typing import TYPE_CHECKING from typing import Union from sqlalchemy.sql.schema import Column from sqlalchemy.sql.schema import FetchedValue from typing_extensions import ContextManager from typing_extensions import Literal from .migration import _ProxyTransaction from .migration import MigrationContext from .. import util from ..operations import Operations from ..script.revision import _GetRevArg if TYPE_CHECKING: from sqlalchemy.engine import URL from sqlalchemy.engine.base import Connection from sqlalchemy.sql import Executable from sqlalchemy.sql.schema import MetaData from sqlalchemy.sql.schema import SchemaItem from sqlalchemy.sql.type_api import TypeEngine from .migration import MigrationInfo from ..autogenerate.api import AutogenContext from ..config import Config from ..ddl import DefaultImpl from ..operations.ops import MigrationScript from ..script.base import ScriptDirectory _RevNumber = Optional[Union[str, Tuple[str, ...]]] ProcessRevisionDirectiveFn = Callable[ [MigrationContext, _GetRevArg, List["MigrationScript"]], None ] RenderItemFn = Callable[ [str, Any, "AutogenContext"], Union[str, Literal[False]] ] NameFilterType = Literal[ "schema", "table", "column", "index", "unique_constraint", "foreign_key_constraint", ] NameFilterParentNames = MutableMapping[ Literal["schema_name", "table_name", "schema_qualified_table_name"], Optional[str], ] IncludeNameFn = Callable[ [Optional[str], NameFilterType, NameFilterParentNames], bool ] IncludeObjectFn = Callable[ [ "SchemaItem", Optional[str], NameFilterType, bool, Optional["SchemaItem"], ], bool, ] OnVersionApplyFn = Callable[ [MigrationContext, "MigrationInfo", Collection[Any], Mapping[str, Any]], None, ] CompareServerDefault = Callable[ [ MigrationContext, "Column[Any]", "Column[Any]", Optional[str], Optional[FetchedValue], Optional[str], ], Optional[bool], ] CompareType = Callable[ [ MigrationContext, "Column[Any]", "Column[Any]", "TypeEngine[Any]", "TypeEngine[Any]", ], Optional[bool], ] class EnvironmentContext(util.ModuleClsProxy): """A configurational facade made available in an ``env.py`` script. The :class:`.EnvironmentContext` acts as a *facade* to the more nuts-and-bolts objects of :class:`.MigrationContext` as well as certain aspects of :class:`.Config`, within the context of the ``env.py`` script that is invoked by most Alembic commands. :class:`.EnvironmentContext` is normally instantiated when a command in :mod:`alembic.command` is run. It then makes itself available in the ``alembic.context`` module for the scope of the command. From within an ``env.py`` script, the current :class:`.EnvironmentContext` is available by importing this module. :class:`.EnvironmentContext` also supports programmatic usage. At this level, it acts as a Python context manager, that is, is intended to be used using the ``with:`` statement. A typical use of :class:`.EnvironmentContext`:: from alembic.config import Config from alembic.script import ScriptDirectory config = Config() config.set_main_option("script_location", "myapp:migrations") script = ScriptDirectory.from_config(config) def my_function(rev, context): '''do something with revision "rev", which will be the current database revision, and "context", which is the MigrationContext that the env.py will create''' with EnvironmentContext( config, script, fn=my_function, as_sql=False, starting_rev="base", destination_rev="head", tag="sometag", ): script.run_env() The above script will invoke the ``env.py`` script within the migration environment. If and when ``env.py`` calls :meth:`.MigrationContext.run_migrations`, the ``my_function()`` function above will be called by the :class:`.MigrationContext`, given the context itself as well as the current revision in the database. .. note:: For most API usages other than full blown invocation of migration scripts, the :class:`.MigrationContext` and :class:`.ScriptDirectory` objects can be created and used directly. The :class:`.EnvironmentContext` object is *only* needed when you need to actually invoke the ``env.py`` module present in the migration environment. """ _migration_context: Optional[MigrationContext] = None config: Config = None # type:ignore[assignment] """An instance of :class:`.Config` representing the configuration file contents as well as other variables set programmatically within it.""" script: ScriptDirectory = None # type:ignore[assignment] """An instance of :class:`.ScriptDirectory` which provides programmatic access to version files within the ``versions/`` directory. """ def __init__( self, config: Config, script: ScriptDirectory, **kw: Any ) -> None: r"""Construct a new :class:`.EnvironmentContext`. :param config: a :class:`.Config` instance. :param script: a :class:`.ScriptDirectory` instance. :param \**kw: keyword options that will be ultimately passed along to the :class:`.MigrationContext` when :meth:`.EnvironmentContext.configure` is called. """ self.config = config self.script = script self.context_opts = kw def __enter__(self) -> EnvironmentContext: """Establish a context which provides a :class:`.EnvironmentContext` object to env.py scripts. The :class:`.EnvironmentContext` will be made available as ``from alembic import context``. """ self._install_proxy() return self def __exit__(self, *arg: Any, **kw: Any) -> None: self._remove_proxy() def is_offline_mode(self) -> bool: """Return True if the current migrations environment is running in "offline mode". This is ``True`` or ``False`` depending on the ``--sql`` flag passed. This function does not require that the :class:`.MigrationContext` has been configured. """ return self.context_opts.get("as_sql", False) # type: ignore[no-any-return] # noqa: E501 def is_transactional_ddl(self) -> bool: """Return True if the context is configured to expect a transactional DDL capable backend. This defaults to the type of database in use, and can be overridden by the ``transactional_ddl`` argument to :meth:`.configure` This function requires that a :class:`.MigrationContext` has first been made available via :meth:`.configure`. """ return self.get_context().impl.transactional_ddl def requires_connection(self) -> bool: return not self.is_offline_mode() def get_head_revision(self) -> _RevNumber: """Return the hex identifier of the 'head' script revision. If the script directory has multiple heads, this method raises a :class:`.CommandError`; :meth:`.EnvironmentContext.get_head_revisions` should be preferred. This function does not require that the :class:`.MigrationContext` has been configured. .. seealso:: :meth:`.EnvironmentContext.get_head_revisions` """ return self.script.as_revision_number("head") def get_head_revisions(self) -> _RevNumber: """Return the hex identifier of the 'heads' script revision(s). This returns a tuple containing the version number of all heads in the script directory. This function does not require that the :class:`.MigrationContext` has been configured. """ return self.script.as_revision_number("heads") def get_starting_revision_argument(self) -> _RevNumber: """Return the 'starting revision' argument, if the revision was passed using ``start:end``. This is only meaningful in "offline" mode. Returns ``None`` if no value is available or was configured. This function does not require that the :class:`.MigrationContext` has been configured. """ if self._migration_context is not None: return self.script.as_revision_number( self.get_context()._start_from_rev ) elif "starting_rev" in self.context_opts: return self.script.as_revision_number( self.context_opts["starting_rev"] ) else: # this should raise only in the case that a command # is being run where the "starting rev" is never applicable; # this is to catch scripts which rely upon this in # non-sql mode or similar raise util.CommandError( "No starting revision argument is available." ) def get_revision_argument(self) -> _RevNumber: """Get the 'destination' revision argument. This is typically the argument passed to the ``upgrade`` or ``downgrade`` command. If it was specified as ``head``, the actual version number is returned; if specified as ``base``, ``None`` is returned. This function does not require that the :class:`.MigrationContext` has been configured. """ return self.script.as_revision_number( self.context_opts["destination_rev"] ) def get_tag_argument(self) -> Optional[str]: """Return the value passed for the ``--tag`` argument, if any. The ``--tag`` argument is not used directly by Alembic, but is available for custom ``env.py`` configurations that wish to use it; particularly for offline generation scripts that wish to generate tagged filenames. This function does not require that the :class:`.MigrationContext` has been configured. .. seealso:: :meth:`.EnvironmentContext.get_x_argument` - a newer and more open ended system of extending ``env.py`` scripts via the command line. """ return self.context_opts.get("tag", None) # type: ignore[no-any-return] # noqa: E501 @overload def get_x_argument(self, as_dictionary: Literal[False]) -> List[str]: ... @overload def get_x_argument( self, as_dictionary: Literal[True] ) -> Dict[str, str]: ... @overload def get_x_argument( self, as_dictionary: bool = ... ) -> Union[List[str], Dict[str, str]]: ... def get_x_argument( self, as_dictionary: bool = False ) -> Union[List[str], Dict[str, str]]: """Return the value(s) passed for the ``-x`` argument, if any. The ``-x`` argument is an open ended flag that allows any user-defined value or values to be passed on the command line, then available here for consumption by a custom ``env.py`` script. The return value is a list, returned directly from the ``argparse`` structure. If ``as_dictionary=True`` is passed, the ``x`` arguments are parsed using ``key=value`` format into a dictionary that is then returned. If there is no ``=`` in the argument, value is an empty string. .. versionchanged:: 1.13.1 Support ``as_dictionary=True`` when arguments are passed without the ``=`` symbol. For example, to support passing a database URL on the command line, the standard ``env.py`` script can be modified like this:: cmd_line_url = context.get_x_argument( as_dictionary=True).get('dbname') if cmd_line_url: engine = create_engine(cmd_line_url) else: engine = engine_from_config( config.get_section(config.config_ini_section), prefix='sqlalchemy.', poolclass=pool.NullPool) This then takes effect by running the ``alembic`` script as:: alembic -x dbname=postgresql://user:pass@host/dbname upgrade head This function does not require that the :class:`.MigrationContext` has been configured. .. seealso:: :meth:`.EnvironmentContext.get_tag_argument` :attr:`.Config.cmd_opts` """ if self.config.cmd_opts is not None: value = self.config.cmd_opts.x or [] else: value = [] if as_dictionary: dict_value = {} for arg in value: x_key, _, x_value = arg.partition("=") dict_value[x_key] = x_value value = dict_value return value def configure( self, connection: Optional[Connection] = None, url: Optional[Union[str, URL]] = None, dialect_name: Optional[str] = None, dialect_opts: Optional[Dict[str, Any]] = None, transactional_ddl: Optional[bool] = None, transaction_per_migration: bool = False, output_buffer: Optional[TextIO] = None, starting_rev: Optional[str] = None, tag: Optional[str] = None, template_args: Optional[Dict[str, Any]] = None, render_as_batch: bool = False, target_metadata: Union[MetaData, Sequence[MetaData], None] = None, include_name: Optional[IncludeNameFn] = None, include_object: Optional[IncludeObjectFn] = None, include_schemas: bool = False, process_revision_directives: Optional[ ProcessRevisionDirectiveFn ] = None, compare_type: Union[bool, CompareType] = True, compare_server_default: Union[bool, CompareServerDefault] = False, render_item: Optional[RenderItemFn] = None, literal_binds: bool = False, upgrade_token: str = "upgrades", downgrade_token: str = "downgrades", alembic_module_prefix: str = "op.", sqlalchemy_module_prefix: str = "sa.", user_module_prefix: Optional[str] = None, on_version_apply: Optional[OnVersionApplyFn] = None, **kw: Any, ) -> None: """Configure a :class:`.MigrationContext` within this :class:`.EnvironmentContext` which will provide database connectivity and other configuration to a series of migration scripts. Many methods on :class:`.EnvironmentContext` require that this method has been called in order to function, as they ultimately need to have database access or at least access to the dialect in use. Those which do are documented as such. The important thing needed by :meth:`.configure` is a means to determine what kind of database dialect is in use. An actual connection to that database is needed only if the :class:`.MigrationContext` is to be used in "online" mode. If the :meth:`.is_offline_mode` function returns ``True``, then no connection is needed here. Otherwise, the ``connection`` parameter should be present as an instance of :class:`sqlalchemy.engine.Connection`. This function is typically called from the ``env.py`` script within a migration environment. It can be called multiple times for an invocation. The most recent :class:`~sqlalchemy.engine.Connection` for which it was called is the one that will be operated upon by the next call to :meth:`.run_migrations`. General parameters: :param connection: a :class:`~sqlalchemy.engine.Connection` to use for SQL execution in "online" mode. When present, is also used to determine the type of dialect in use. :param url: a string database url, or a :class:`sqlalchemy.engine.url.URL` object. The type of dialect to be used will be derived from this if ``connection`` is not passed. :param dialect_name: string name of a dialect, such as "postgresql", "mssql", etc. The type of dialect to be used will be derived from this if ``connection`` and ``url`` are not passed. :param dialect_opts: dictionary of options to be passed to dialect constructor. :param transactional_ddl: Force the usage of "transactional" DDL on or off; this otherwise defaults to whether or not the dialect in use supports it. :param transaction_per_migration: if True, nest each migration script in a transaction rather than the full series of migrations to run. :param output_buffer: a file-like object that will be used for textual output when the ``--sql`` option is used to generate SQL scripts. Defaults to ``sys.stdout`` if not passed here and also not present on the :class:`.Config` object. The value here overrides that of the :class:`.Config` object. :param output_encoding: when using ``--sql`` to generate SQL scripts, apply this encoding to the string output. :param literal_binds: when using ``--sql`` to generate SQL scripts, pass through the ``literal_binds`` flag to the compiler so that any literal values that would ordinarily be bound parameters are converted to plain strings. .. warning:: Dialects can typically only handle simple datatypes like strings and numbers for auto-literal generation. Datatypes like dates, intervals, and others may still require manual formatting, typically using :meth:`.Operations.inline_literal`. .. note:: the ``literal_binds`` flag is ignored on SQLAlchemy versions prior to 0.8 where this feature is not supported. .. seealso:: :meth:`.Operations.inline_literal` :param starting_rev: Override the "starting revision" argument when using ``--sql`` mode. :param tag: a string tag for usage by custom ``env.py`` scripts. Set via the ``--tag`` option, can be overridden here. :param template_args: dictionary of template arguments which will be added to the template argument environment when running the "revision" command. Note that the script environment is only run within the "revision" command if the --autogenerate option is used, or if the option "revision_environment=true" is present in the alembic.ini file. :param version_table: The name of the Alembic version table. The default is ``'alembic_version'``. :param version_table_schema: Optional schema to place version table within. :param version_table_pk: boolean, whether the Alembic version table should use a primary key constraint for the "value" column; this only takes effect when the table is first created. Defaults to True; setting to False should not be necessary and is here for backwards compatibility reasons. :param on_version_apply: a callable or collection of callables to be run for each migration step. The callables will be run in the order they are given, once for each migration step, after the respective operation has been applied but before its transaction is finalized. Each callable accepts no positional arguments and the following keyword arguments: * ``ctx``: the :class:`.MigrationContext` running the migration, * ``step``: a :class:`.MigrationInfo` representing the step currently being applied, * ``heads``: a collection of version strings representing the current heads, * ``run_args``: the ``**kwargs`` passed to :meth:`.run_migrations`. Parameters specific to the autogenerate feature, when ``alembic revision`` is run with the ``--autogenerate`` feature: :param target_metadata: a :class:`sqlalchemy.schema.MetaData` object, or a sequence of :class:`~sqlalchemy.schema.MetaData` objects, that will be consulted during autogeneration. The tables present in each :class:`~sqlalchemy.schema.MetaData` will be compared against what is locally available on the target :class:`~sqlalchemy.engine.Connection` to produce candidate upgrade/downgrade operations. :param compare_type: Indicates type comparison behavior during an autogenerate operation. Defaults to ``True`` turning on type comparison, which has good accuracy on most backends. See :ref:`compare_types` for an example as well as information on other type comparison options. Set to ``False`` which disables type comparison. A callable can also be passed to provide custom type comparison, see :ref:`compare_types` for additional details. .. versionchanged:: 1.12.0 The default value of :paramref:`.EnvironmentContext.configure.compare_type` has been changed to ``True``. .. seealso:: :ref:`compare_types` :paramref:`.EnvironmentContext.configure.compare_server_default` :param compare_server_default: Indicates server default comparison behavior during an autogenerate operation. Defaults to ``False`` which disables server default comparison. Set to ``True`` to turn on server default comparison, which has varied accuracy depending on backend. To customize server default comparison behavior, a callable may be specified which can filter server default comparisons during an autogenerate operation. defaults during an autogenerate operation. The format of this callable is:: def my_compare_server_default(context, inspected_column, metadata_column, inspected_default, metadata_default, rendered_metadata_default): # return True if the defaults are different, # False if not, or None to allow the default implementation # to compare these defaults return None context.configure( # ... compare_server_default = my_compare_server_default ) ``inspected_column`` is a dictionary structure as returned by :meth:`sqlalchemy.engine.reflection.Inspector.get_columns`, whereas ``metadata_column`` is a :class:`sqlalchemy.schema.Column` from the local model environment. A return value of ``None`` indicates to allow default server default comparison to proceed. Note that some backends such as Postgresql actually execute the two defaults on the database side to compare for equivalence. .. seealso:: :paramref:`.EnvironmentContext.configure.compare_type` :param include_name: A callable function which is given the chance to return ``True`` or ``False`` for any database reflected object based on its name, including database schema names when the :paramref:`.EnvironmentContext.configure.include_schemas` flag is set to ``True``. The function accepts the following positional arguments: * ``name``: the name of the object, such as schema name or table name. Will be ``None`` when indicating the default schema name of the database connection. * ``type``: a string describing the type of object; currently ``"schema"``, ``"table"``, ``"column"``, ``"index"``, ``"unique_constraint"``, or ``"foreign_key_constraint"`` * ``parent_names``: a dictionary of "parent" object names, that are relative to the name being given. Keys in this dictionary may include: ``"schema_name"``, ``"table_name"`` or ``"schema_qualified_table_name"``. E.g.:: def include_name(name, type_, parent_names): if type_ == "schema": return name in ["schema_one", "schema_two"] else: return True context.configure( # ... include_schemas = True, include_name = include_name ) .. seealso:: :ref:`autogenerate_include_hooks` :paramref:`.EnvironmentContext.configure.include_object` :paramref:`.EnvironmentContext.configure.include_schemas` :param include_object: A callable function which is given the chance to return ``True`` or ``False`` for any object, indicating if the given object should be considered in the autogenerate sweep. The function accepts the following positional arguments: * ``object``: a :class:`~sqlalchemy.schema.SchemaItem` object such as a :class:`~sqlalchemy.schema.Table`, :class:`~sqlalchemy.schema.Column`, :class:`~sqlalchemy.schema.Index` :class:`~sqlalchemy.schema.UniqueConstraint`, or :class:`~sqlalchemy.schema.ForeignKeyConstraint` object * ``name``: the name of the object. This is typically available via ``object.name``. * ``type``: a string describing the type of object; currently ``"table"``, ``"column"``, ``"index"``, ``"unique_constraint"``, or ``"foreign_key_constraint"`` * ``reflected``: ``True`` if the given object was produced based on table reflection, ``False`` if it's from a local :class:`.MetaData` object. * ``compare_to``: the object being compared against, if available, else ``None``. E.g.:: def include_object(object, name, type_, reflected, compare_to): if (type_ == "column" and not reflected and object.info.get("skip_autogenerate", False)): return False else: return True context.configure( # ... include_object = include_object ) For the use case of omitting specific schemas from a target database when :paramref:`.EnvironmentContext.configure.include_schemas` is set to ``True``, the :attr:`~sqlalchemy.schema.Table.schema` attribute can be checked for each :class:`~sqlalchemy.schema.Table` object passed to the hook, however it is much more efficient to filter on schemas before reflection of objects takes place using the :paramref:`.EnvironmentContext.configure.include_name` hook. .. seealso:: :ref:`autogenerate_include_hooks` :paramref:`.EnvironmentContext.configure.include_name` :paramref:`.EnvironmentContext.configure.include_schemas` :param render_as_batch: if True, commands which alter elements within a table will be placed under a ``with batch_alter_table():`` directive, so that batch migrations will take place. .. seealso:: :ref:`batch_migrations` :param include_schemas: If True, autogenerate will scan across all schemas located by the SQLAlchemy :meth:`~sqlalchemy.engine.reflection.Inspector.get_schema_names` method, and include all differences in tables found across all those schemas. When using this option, you may want to also use the :paramref:`.EnvironmentContext.configure.include_name` parameter to specify a callable which can filter the tables/schemas that get included. .. seealso:: :ref:`autogenerate_include_hooks` :paramref:`.EnvironmentContext.configure.include_name` :paramref:`.EnvironmentContext.configure.include_object` :param render_item: Callable that can be used to override how any schema item, i.e. column, constraint, type, etc., is rendered for autogenerate. The callable receives a string describing the type of object, the object, and the autogen context. If it returns False, the default rendering method will be used. If it returns None, the item will not be rendered in the context of a Table construct, that is, can be used to skip columns or constraints within op.create_table():: def my_render_column(type_, col, autogen_context): if type_ == "column" and isinstance(col, MySpecialCol): return repr(col) else: return False context.configure( # ... render_item = my_render_column ) Available values for the type string include: ``"column"``, ``"primary_key"``, ``"foreign_key"``, ``"unique"``, ``"check"``, ``"type"``, ``"server_default"``. .. seealso:: :ref:`autogen_render_types` :param upgrade_token: When autogenerate completes, the text of the candidate upgrade operations will be present in this template variable when ``script.py.mako`` is rendered. Defaults to ``upgrades``. :param downgrade_token: When autogenerate completes, the text of the candidate downgrade operations will be present in this template variable when ``script.py.mako`` is rendered. Defaults to ``downgrades``. :param alembic_module_prefix: When autogenerate refers to Alembic :mod:`alembic.operations` constructs, this prefix will be used (i.e. ``op.create_table``) Defaults to "``op.``". Can be ``None`` to indicate no prefix. :param sqlalchemy_module_prefix: When autogenerate refers to SQLAlchemy :class:`~sqlalchemy.schema.Column` or type classes, this prefix will be used (i.e. ``sa.Column("somename", sa.Integer)``) Defaults to "``sa.``". Can be ``None`` to indicate no prefix. Note that when dialect-specific types are rendered, autogenerate will render them using the dialect module name, i.e. ``mssql.BIT()``, ``postgresql.UUID()``. :param user_module_prefix: When autogenerate refers to a SQLAlchemy type (e.g. :class:`.TypeEngine`) where the module name is not under the ``sqlalchemy`` namespace, this prefix will be used within autogenerate. If left at its default of ``None``, the ``__module__`` attribute of the type is used to render the import module. It's a good practice to set this and to have all custom types be available from a fixed module space, in order to future-proof migration files against reorganizations in modules. .. seealso:: :ref:`autogen_module_prefix` :param process_revision_directives: a callable function that will be passed a structure representing the end result of an autogenerate or plain "revision" operation, which can be manipulated to affect how the ``alembic revision`` command ultimately outputs new revision scripts. The structure of the callable is:: def process_revision_directives(context, revision, directives): pass The ``directives`` parameter is a Python list containing a single :class:`.MigrationScript` directive, which represents the revision file to be generated. This list as well as its contents may be freely modified to produce any set of commands. The section :ref:`customizing_revision` shows an example of doing this. The ``context`` parameter is the :class:`.MigrationContext` in use, and ``revision`` is a tuple of revision identifiers representing the current revision of the database. The callable is invoked at all times when the ``--autogenerate`` option is passed to ``alembic revision``. If ``--autogenerate`` is not passed, the callable is invoked only if the ``revision_environment`` variable is set to True in the Alembic configuration, in which case the given ``directives`` collection will contain empty :class:`.UpgradeOps` and :class:`.DowngradeOps` collections for ``.upgrade_ops`` and ``.downgrade_ops``. The ``--autogenerate`` option itself can be inferred by inspecting ``context.config.cmd_opts.autogenerate``. The callable function may optionally be an instance of a :class:`.Rewriter` object. This is a helper object that assists in the production of autogenerate-stream rewriter functions. .. seealso:: :ref:`customizing_revision` :ref:`autogen_rewriter` :paramref:`.command.revision.process_revision_directives` Parameters specific to individual backends: :param mssql_batch_separator: The "batch separator" which will be placed between each statement when generating offline SQL Server migrations. Defaults to ``GO``. Note this is in addition to the customary semicolon ``;`` at the end of each statement; SQL Server considers the "batch separator" to denote the end of an individual statement execution, and cannot group certain dependent operations in one step. :param oracle_batch_separator: The "batch separator" which will be placed between each statement when generating offline Oracle migrations. Defaults to ``/``. Oracle doesn't add a semicolon between statements like most other backends. """ opts = self.context_opts if transactional_ddl is not None: opts["transactional_ddl"] = transactional_ddl if output_buffer is not None: opts["output_buffer"] = output_buffer elif self.config.output_buffer is not None: opts["output_buffer"] = self.config.output_buffer if starting_rev: opts["starting_rev"] = starting_rev if tag: opts["tag"] = tag if template_args and "template_args" in opts: opts["template_args"].update(template_args) opts["transaction_per_migration"] = transaction_per_migration opts["target_metadata"] = target_metadata opts["include_name"] = include_name opts["include_object"] = include_object opts["include_schemas"] = include_schemas opts["render_as_batch"] = render_as_batch opts["upgrade_token"] = upgrade_token opts["downgrade_token"] = downgrade_token opts["sqlalchemy_module_prefix"] = sqlalchemy_module_prefix opts["alembic_module_prefix"] = alembic_module_prefix opts["user_module_prefix"] = user_module_prefix opts["literal_binds"] = literal_binds opts["process_revision_directives"] = process_revision_directives opts["on_version_apply"] = util.to_tuple(on_version_apply, default=()) if render_item is not None: opts["render_item"] = render_item opts["compare_type"] = compare_type if compare_server_default is not None: opts["compare_server_default"] = compare_server_default opts["script"] = self.script opts.update(kw) self._migration_context = MigrationContext.configure( connection=connection, url=url, dialect_name=dialect_name, environment_context=self, dialect_opts=dialect_opts, opts=opts, ) def run_migrations(self, **kw: Any) -> None: """Run migrations as determined by the current command line configuration as well as versioning information present (or not) in the current database connection (if one is present). The function accepts optional ``**kw`` arguments. If these are passed, they are sent directly to the ``upgrade()`` and ``downgrade()`` functions within each target revision file. By modifying the ``script.py.mako`` file so that the ``upgrade()`` and ``downgrade()`` functions accept arguments, parameters can be passed here so that contextual information, usually information to identify a particular database in use, can be passed from a custom ``env.py`` script to the migration functions. This function requires that a :class:`.MigrationContext` has first been made available via :meth:`.configure`. """ assert self._migration_context is not None with Operations.context(self._migration_context): self.get_context().run_migrations(**kw) def execute( self, sql: Union[Executable, str], execution_options: Optional[Dict[str, Any]] = None, ) -> None: """Execute the given SQL using the current change context. The behavior of :meth:`.execute` is the same as that of :meth:`.Operations.execute`. Please see that function's documentation for full detail including caveats and limitations. This function requires that a :class:`.MigrationContext` has first been made available via :meth:`.configure`. """ self.get_context().execute(sql, execution_options=execution_options) def static_output(self, text: str) -> None: """Emit text directly to the "offline" SQL stream. Typically this is for emitting comments that start with --. The statement is not treated as a SQL execution, no ; or batch separator is added, etc. """ self.get_context().impl.static_output(text) def begin_transaction( self, ) -> Union[_ProxyTransaction, ContextManager[None, Optional[bool]]]: """Return a context manager that will enclose an operation within a "transaction", as defined by the environment's offline and transactional DDL settings. e.g.:: with context.begin_transaction(): context.run_migrations() :meth:`.begin_transaction` is intended to "do the right thing" regardless of calling context: * If :meth:`.is_transactional_ddl` is ``False``, returns a "do nothing" context manager which otherwise produces no transactional state or directives. * If :meth:`.is_offline_mode` is ``True``, returns a context manager that will invoke the :meth:`.DefaultImpl.emit_begin` and :meth:`.DefaultImpl.emit_commit` methods, which will produce the string directives ``BEGIN`` and ``COMMIT`` on the output stream, as rendered by the target backend (e.g. SQL Server would emit ``BEGIN TRANSACTION``). * Otherwise, calls :meth:`sqlalchemy.engine.Connection.begin` on the current online connection, which returns a :class:`sqlalchemy.engine.Transaction` object. This object demarcates a real transaction and is itself a context manager, which will roll back if an exception is raised. Note that a custom ``env.py`` script which has more specific transactional needs can of course manipulate the :class:`~sqlalchemy.engine.Connection` directly to produce transactional state in "online" mode. """ return self.get_context().begin_transaction() def get_context(self) -> MigrationContext: """Return the current :class:`.MigrationContext` object. If :meth:`.EnvironmentContext.configure` has not been called yet, raises an exception. """ if self._migration_context is None: raise Exception("No context has been configured yet.") return self._migration_context def get_bind(self) -> Connection: """Return the current 'bind'. In "online" mode, this is the :class:`sqlalchemy.engine.Connection` currently being used to emit SQL to the database. This function requires that a :class:`.MigrationContext` has first been made available via :meth:`.configure`. """ return self.get_context().bind # type: ignore[return-value] def get_impl(self) -> DefaultImpl: return self.get_context().impl PK! __init__.pynu[PK!|%__pycache__/migration.cpython-312.pycnu[PK!'3__pycache__/environment.cpython-312.pycnu[PK!sԧ$__pycache__/__init__.cpython-312.pycnu[PK!m migration.pynu[PK!44Venvironment.pynu[PK!  __init__.pynu[PKE